From 32f78668dbad0bae907a4085a0ac5f609704e7ac Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sat, 27 May 2023 23:57:33 +0800 Subject: [PATCH] chore: add more tests (#3290) --- core/queue/queue_test.go | 89 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/core/queue/queue_test.go b/core/queue/queue_test.go index 5e5dd96a..d11c228d 100644 --- a/core/queue/queue_test.go +++ b/core/queue/queue_test.go @@ -1,6 +1,7 @@ package queue import ( + "errors" "sync" "sync/atomic" "testing" @@ -37,10 +38,82 @@ func TestQueue(t *testing.T) { assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) } +func TestQueue_Broadcast(t *testing.T) { + producer := newMockedProducer(rounds) + consumer := newMockedConsumer() + consumer.wait.Add(consumers) + q := NewQueue(func() (Producer, error) { + return producer, nil + }, func() (Consumer, error) { + return consumer, nil + }) + q.AddListener(new(mockedListener)) + q.SetName("mockqueue") + q.SetNumConsumer(consumers) + q.SetNumProducer(1) + q.Broadcast("message") + go func() { + producer.wait.Wait() + q.Stop() + }() + q.Start() + consumer.wait.Wait() + assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) + assert.Equal(t, int32(consumers), atomic.LoadInt32(&consumer.events)) +} + +func TestQueue_PauseResume(t *testing.T) { + producer := newMockedProducer(rounds) + consumer := newMockedConsumer() + consumer.wait.Add(consumers) + q := NewQueue(func() (Producer, error) { + return producer, nil + }, func() (Consumer, error) { + return consumer, nil + }) + q.AddListener(new(mockedListener)) + q.SetName("mockqueue") + q.SetNumConsumer(consumers) + q.SetNumProducer(1) + go func() { + producer.wait.Wait() + q.Stop() + }() + q.Start() + producer.listener.OnProducerPause() + assert.Equal(t, int32(0), atomic.LoadInt32(&q.active)) + producer.listener.OnProducerResume() + assert.Equal(t, int32(1), atomic.LoadInt32(&q.active)) + assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) +} + +func TestQueue_ConsumeError(t *testing.T) { + producer := newMockedProducer(rounds) + consumer := newMockedConsumer() + consumer.consumeErr = errors.New("consume error") + consumer.wait.Add(consumers) + q := NewQueue(func() (Producer, error) { + return producer, nil + }, func() (Consumer, error) { + return consumer, nil + }) + q.AddListener(new(mockedListener)) + q.SetName("mockqueue") + q.SetNumConsumer(consumers) + q.SetNumProducer(1) + go func() { + producer.wait.Wait() + q.Stop() + }() + q.Start() + assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) +} + type mockedConsumer struct { - count int32 - events int32 - wait sync.WaitGroup + count int32 + events int32 + consumeErr error + wait sync.WaitGroup } func newMockedConsumer() *mockedConsumer { @@ -49,7 +122,7 @@ func newMockedConsumer() *mockedConsumer { func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) - return nil + return c.consumeErr } func (c *mockedConsumer) OnEvent(any) { @@ -59,9 +132,10 @@ func (c *mockedConsumer) OnEvent(any) { } type mockedProducer struct { - total int32 - count int32 - wait sync.WaitGroup + total int32 + count int32 + listener ProduceListener + wait sync.WaitGroup } func newMockedProducer(total int32) *mockedProducer { @@ -72,6 +146,7 @@ func newMockedProducer(total int32) *mockedProducer { } func (p *mockedProducer) AddListener(listener ProduceListener) { + p.listener = listener } func (p *mockedProducer) Produce() (string, bool) {