diff --git a/core/queue/balancedpusher.go b/core/queue/balancedpusher.go index 32bc2ba2..fd29dfab 100644 --- a/core/queue/balancedpusher.go +++ b/core/queue/balancedpusher.go @@ -7,14 +7,17 @@ import ( "github.com/tal-tech/go-zero/core/logx" ) +// ErrNoAvailablePusher indicates no pusher available. var ErrNoAvailablePusher = errors.New("no available pusher") +// A BalancedPusher is used to push messages to multiple pusher with round robin algorithm. type BalancedPusher struct { name string pushers []Pusher index uint64 } +// NewBalancedPusher returns a BalancedPusher. func NewBalancedPusher(pushers []Pusher) Pusher { return &BalancedPusher{ name: generateName(pushers), @@ -22,10 +25,12 @@ func NewBalancedPusher(pushers []Pusher) Pusher { } } +// Name returns the name of pusher. func (pusher *BalancedPusher) Name() string { return pusher.name } +// Push pushes message to one of the underlying pushers. func (pusher *BalancedPusher) Push(message string) error { size := len(pusher.pushers) diff --git a/core/queue/consumer.go b/core/queue/consumer.go index 8f12d97f..56f07661 100644 --- a/core/queue/consumer.go +++ b/core/queue/consumer.go @@ -1,10 +1,12 @@ package queue type ( + // A Consumer interface represents a consumer that can consume string messages. Consumer interface { Consume(string) error OnEvent(event interface{}) } + // ConsumerFactory defines the factory to generate consumers. ConsumerFactory func() (Consumer, error) ) diff --git a/core/queue/messagequeue.go b/core/queue/messagequeue.go index 569ae566..e050c778 100644 --- a/core/queue/messagequeue.go +++ b/core/queue/messagequeue.go @@ -1,5 +1,6 @@ package queue +// A MessageQueue interface represents a message queue. type MessageQueue interface { Start() Stop() diff --git a/core/queue/multipusher.go b/core/queue/multipusher.go index b9df5c7c..fcff873b 100644 --- a/core/queue/multipusher.go +++ b/core/queue/multipusher.go @@ -2,11 +2,13 @@ package queue import "github.com/tal-tech/go-zero/core/errorx" +// A MultiPusher is a pusher that can push messages to multiple underlying pushers. type MultiPusher struct { name string pushers []Pusher } +// NewMultiPusher returns a MultiPusher. func NewMultiPusher(pushers []Pusher) Pusher { return &MultiPusher{ name: generateName(pushers), @@ -14,10 +16,12 @@ func NewMultiPusher(pushers []Pusher) Pusher { } } +// Name returns the name of pusher. func (pusher *MultiPusher) Name() string { return pusher.name } +// Push pushes a message into the underlying queue. func (pusher *MultiPusher) Push(message string) error { var batchError errorx.BatchError diff --git a/core/queue/producer.go b/core/queue/producer.go index c0ca935d..e2b132b0 100644 --- a/core/queue/producer.go +++ b/core/queue/producer.go @@ -1,15 +1,18 @@ package queue type ( + // A Producer interface represents a producer that produces messages. Producer interface { AddListener(listener ProduceListener) Produce() (string, bool) } + // A ProduceListener interface represents a produce listener. ProduceListener interface { OnProducerPause() OnProducerResume() } + // ProducerFactory defines the method to generate a Producer. ProducerFactory func() (Producer, error) ) diff --git a/core/queue/queue.go b/core/queue/queue.go index ec917bec..f6dd85cf 100644 --- a/core/queue/queue.go +++ b/core/queue/queue.go @@ -16,6 +16,7 @@ import ( const queueName = "queue" type ( + // A Queue is a message queue. Queue struct { name string metrics *stat.Metrics @@ -33,24 +34,28 @@ type ( eventChannels []chan interface{} } + // A Listener interface represents a listener that can be notified with queue events. Listener interface { OnPause() OnResume() } + // A Poller interface wraps the method Poll. Poller interface { Name() string Poll() string } + // A Pusher interface wraps the method Push. Pusher interface { Name() string Push(string) error } ) +// NewQueue returns a Queue. func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue { - queue := &Queue{ + q := &Queue{ metrics: stat.NewMetrics(queueName), producerFactory: producerFactory, producerRoutineGroup: threading.NewRoutineGroup(), @@ -61,58 +66,65 @@ func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) channel: make(chan string), quit: make(chan struct{}), } - queue.SetName(queueName) + q.SetName(queueName) - return queue + return q } -func (queue *Queue) AddListener(listener Listener) { - queue.listeners = append(queue.listeners, listener) +// AddListener adds a litener to q. +func (q *Queue) AddListener(listener Listener) { + q.listeners = append(q.listeners, listener) } -func (queue *Queue) Broadcast(message interface{}) { +// Broadcast broadcasts message to all event channels. +func (q *Queue) Broadcast(message interface{}) { go func() { - queue.eventLock.Lock() - defer queue.eventLock.Unlock() + q.eventLock.Lock() + defer q.eventLock.Unlock() - for _, channel := range queue.eventChannels { + for _, channel := range q.eventChannels { channel <- message } }() } -func (queue *Queue) SetName(name string) { - queue.name = name - queue.metrics.SetName(name) +// SetName sets the name of q. +func (q *Queue) SetName(name string) { + q.name = name + q.metrics.SetName(name) } -func (queue *Queue) SetNumConsumer(count int) { - queue.consumerCount = count +// SetNumConsumer sets the number of consumers. +func (q *Queue) SetNumConsumer(count int) { + q.consumerCount = count } -func (queue *Queue) SetNumProducer(count int) { - queue.producerCount = count +// SetNumProducer sets the number of producers. +func (q *Queue) SetNumProducer(count int) { + q.producerCount = count } -func (queue *Queue) Start() { - queue.startProducers(queue.producerCount) - queue.startConsumers(queue.consumerCount) +// Start starts q. +func (q *Queue) Start() { + q.startProducers(q.producerCount) + q.startConsumers(q.consumerCount) - queue.producerRoutineGroup.Wait() - close(queue.channel) - queue.consumerRoutineGroup.Wait() + q.producerRoutineGroup.Wait() + close(q.channel) + q.consumerRoutineGroup.Wait() } -func (queue *Queue) Stop() { - close(queue.quit) +// Stop stops q. +func (q *Queue) Stop() { + close(q.quit) } -func (queue *Queue) consume(eventChan chan interface{}) { +func (q *Queue) consume(eventChan chan interface{}) { var consumer Consumer for { var err error - if consumer, err = queue.consumerFactory(); err != nil { + if consumer, err = q.consumerFactory(); err != nil { logx.Errorf("Error on creating consumer: %v", err) time.Sleep(time.Second) } else { @@ -122,9 +134,9 @@ func (queue *Queue) consume(eventChan chan interface{}) { for { select { - case message, ok := <-queue.channel: + case message, ok := <-q.channel: if ok { - queue.consumeOne(consumer, message) + q.consumeOne(consumer, message) } else { logx.Info("Task channel was closed, quitting consumer...") return @@ -135,12 +147,12 @@ func (queue *Queue) consume(eventChan chan interface{}) { } } -func (queue *Queue) consumeOne(consumer Consumer, message string) { +func (q *Queue) consumeOne(consumer Consumer, message string) { threading.RunSafe(func() { startTime := timex.Now() defer func() { duration := timex.Since(startTime) - queue.metrics.Add(stat.Task{ + q.metrics.Add(stat.Task{ Duration: duration, }) logx.WithDuration(duration).Infof("%s", message) @@ -152,18 +164,18 @@ func (queue *Queue) consumeOne(consumer Consumer, message string) { }) } -func (queue *Queue) pause() { - for _, listener := range queue.listeners { +func (q *Queue) pause() { + for _, listener := range q.listeners { listener.OnPause() } } -func (queue *Queue) produce() { +func (q *Queue) produce() { var producer Producer for { var err error - if producer, err = queue.producerFactory(); err != nil { + if producer, err = q.producerFactory(); err != nil { logx.Errorf("Error on creating producer: %v", err) time.Sleep(time.Second) } else { @@ -171,53 +183,53 @@ func (queue *Queue) produce() { } } - atomic.AddInt32(&queue.active, 1) + atomic.AddInt32(&q.active, 1) producer.AddListener(routineListener{ - queue: queue, + queue: q, }) for { select { - case <-queue.quit: + case <-q.quit: logx.Info("Quitting producer") return default: - if v, ok := queue.produceOne(producer); ok { - queue.channel <- v + if v, ok := q.produceOne(producer); ok { + q.channel <- v } } } } -func (queue *Queue) produceOne(producer Producer) (string, bool) { +func (q *Queue) produceOne(producer Producer) (string, bool) { // avoid panic quit the producer, just log it and continue defer rescue.Recover() return producer.Produce() } -func (queue *Queue) resume() { - for _, listener := range queue.listeners { +func (q *Queue) resume() { + for _, listener := range q.listeners { listener.OnResume() } } -func (queue *Queue) startConsumers(number int) { +func (q *Queue) startConsumers(number int) { for i := 0; i < number; i++ { eventChan := make(chan interface{}) - queue.eventLock.Lock() - queue.eventChannels = append(queue.eventChannels, eventChan) - queue.eventLock.Unlock() - queue.consumerRoutineGroup.Run(func() { - queue.consume(eventChan) + q.eventLock.Lock() + q.eventChannels = append(q.eventChannels, eventChan) + q.eventLock.Unlock() + q.consumerRoutineGroup.Run(func() { + q.consume(eventChan) }) } } -func (queue *Queue) startProducers(number int) { +func (q *Queue) startProducers(number int) { for i := 0; i < number; i++ { - queue.producerRoutineGroup.Run(func() { - queue.produce() + q.producerRoutineGroup.Run(func() { + q.produce() }) } }