fix golint issues in core/queue (#507)

This commit is contained in:
Kevin Wan
2021-02-22 16:38:42 +08:00
committed by GitHub
parent 6e4c98e52d
commit 497762ab47
6 changed files with 78 additions and 51 deletions

View File

@@ -7,14 +7,17 @@ import (
"github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
) )
// ErrNoAvailablePusher indicates no pusher available.
var ErrNoAvailablePusher = errors.New("no available pusher") var ErrNoAvailablePusher = errors.New("no available pusher")
// A BalancedPusher is used to push messages to multiple pusher with round robin algorithm.
type BalancedPusher struct { type BalancedPusher struct {
name string name string
pushers []Pusher pushers []Pusher
index uint64 index uint64
} }
// NewBalancedPusher returns a BalancedPusher.
func NewBalancedPusher(pushers []Pusher) Pusher { func NewBalancedPusher(pushers []Pusher) Pusher {
return &BalancedPusher{ return &BalancedPusher{
name: generateName(pushers), name: generateName(pushers),
@@ -22,10 +25,12 @@ func NewBalancedPusher(pushers []Pusher) Pusher {
} }
} }
// Name returns the name of pusher.
func (pusher *BalancedPusher) Name() string { func (pusher *BalancedPusher) Name() string {
return pusher.name return pusher.name
} }
// Push pushes message to one of the underlying pushers.
func (pusher *BalancedPusher) Push(message string) error { func (pusher *BalancedPusher) Push(message string) error {
size := len(pusher.pushers) size := len(pusher.pushers)

View File

@@ -1,10 +1,12 @@
package queue package queue
type ( type (
// A Consumer interface represents a consumer that can consume string messages.
Consumer interface { Consumer interface {
Consume(string) error Consume(string) error
OnEvent(event interface{}) OnEvent(event interface{})
} }
// ConsumerFactory defines the factory to generate consumers.
ConsumerFactory func() (Consumer, error) ConsumerFactory func() (Consumer, error)
) )

View File

@@ -1,5 +1,6 @@
package queue package queue
// A MessageQueue interface represents a message queue.
type MessageQueue interface { type MessageQueue interface {
Start() Start()
Stop() Stop()

View File

@@ -2,11 +2,13 @@ package queue
import "github.com/tal-tech/go-zero/core/errorx" import "github.com/tal-tech/go-zero/core/errorx"
// A MultiPusher is a pusher that can push messages to multiple underlying pushers.
type MultiPusher struct { type MultiPusher struct {
name string name string
pushers []Pusher pushers []Pusher
} }
// NewMultiPusher returns a MultiPusher.
func NewMultiPusher(pushers []Pusher) Pusher { func NewMultiPusher(pushers []Pusher) Pusher {
return &MultiPusher{ return &MultiPusher{
name: generateName(pushers), name: generateName(pushers),
@@ -14,10 +16,12 @@ func NewMultiPusher(pushers []Pusher) Pusher {
} }
} }
// Name returns the name of pusher.
func (pusher *MultiPusher) Name() string { func (pusher *MultiPusher) Name() string {
return pusher.name return pusher.name
} }
// Push pushes a message into the underlying queue.
func (pusher *MultiPusher) Push(message string) error { func (pusher *MultiPusher) Push(message string) error {
var batchError errorx.BatchError var batchError errorx.BatchError

View File

@@ -1,15 +1,18 @@
package queue package queue
type ( type (
// A Producer interface represents a producer that produces messages.
Producer interface { Producer interface {
AddListener(listener ProduceListener) AddListener(listener ProduceListener)
Produce() (string, bool) Produce() (string, bool)
} }
// A ProduceListener interface represents a produce listener.
ProduceListener interface { ProduceListener interface {
OnProducerPause() OnProducerPause()
OnProducerResume() OnProducerResume()
} }
// ProducerFactory defines the method to generate a Producer.
ProducerFactory func() (Producer, error) ProducerFactory func() (Producer, error)
) )

View File

@@ -16,6 +16,7 @@ import (
const queueName = "queue" const queueName = "queue"
type ( type (
// A Queue is a message queue.
Queue struct { Queue struct {
name string name string
metrics *stat.Metrics metrics *stat.Metrics
@@ -33,24 +34,28 @@ type (
eventChannels []chan interface{} eventChannels []chan interface{}
} }
// A Listener interface represents a listener that can be notified with queue events.
Listener interface { Listener interface {
OnPause() OnPause()
OnResume() OnResume()
} }
// A Poller interface wraps the method Poll.
Poller interface { Poller interface {
Name() string Name() string
Poll() string Poll() string
} }
// A Pusher interface wraps the method Push.
Pusher interface { Pusher interface {
Name() string Name() string
Push(string) error Push(string) error
} }
) )
// NewQueue returns a Queue.
func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue { func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
queue := &Queue{ q := &Queue{
metrics: stat.NewMetrics(queueName), metrics: stat.NewMetrics(queueName),
producerFactory: producerFactory, producerFactory: producerFactory,
producerRoutineGroup: threading.NewRoutineGroup(), producerRoutineGroup: threading.NewRoutineGroup(),
@@ -61,58 +66,65 @@ func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory)
channel: make(chan string), channel: make(chan string),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
queue.SetName(queueName) q.SetName(queueName)
return queue return q
} }
func (queue *Queue) AddListener(listener Listener) { // AddListener adds a litener to q.
queue.listeners = append(queue.listeners, listener) func (q *Queue) AddListener(listener Listener) {
q.listeners = append(q.listeners, listener)
} }
func (queue *Queue) Broadcast(message interface{}) { // Broadcast broadcasts message to all event channels.
func (q *Queue) Broadcast(message interface{}) {
go func() { go func() {
queue.eventLock.Lock() q.eventLock.Lock()
defer queue.eventLock.Unlock() defer q.eventLock.Unlock()
for _, channel := range queue.eventChannels { for _, channel := range q.eventChannels {
channel <- message channel <- message
} }
}() }()
} }
func (queue *Queue) SetName(name string) { // SetName sets the name of q.
queue.name = name func (q *Queue) SetName(name string) {
queue.metrics.SetName(name) q.name = name
q.metrics.SetName(name)
} }
func (queue *Queue) SetNumConsumer(count int) { // SetNumConsumer sets the number of consumers.
queue.consumerCount = count func (q *Queue) SetNumConsumer(count int) {
q.consumerCount = count
} }
func (queue *Queue) SetNumProducer(count int) { // SetNumProducer sets the number of producers.
queue.producerCount = count func (q *Queue) SetNumProducer(count int) {
q.producerCount = count
} }
func (queue *Queue) Start() { // Start starts q.
queue.startProducers(queue.producerCount) func (q *Queue) Start() {
queue.startConsumers(queue.consumerCount) q.startProducers(q.producerCount)
q.startConsumers(q.consumerCount)
queue.producerRoutineGroup.Wait() q.producerRoutineGroup.Wait()
close(queue.channel) close(q.channel)
queue.consumerRoutineGroup.Wait() q.consumerRoutineGroup.Wait()
} }
func (queue *Queue) Stop() { // Stop stops q.
close(queue.quit) func (q *Queue) Stop() {
close(q.quit)
} }
func (queue *Queue) consume(eventChan chan interface{}) { func (q *Queue) consume(eventChan chan interface{}) {
var consumer Consumer var consumer Consumer
for { for {
var err error var err error
if consumer, err = queue.consumerFactory(); err != nil { if consumer, err = q.consumerFactory(); err != nil {
logx.Errorf("Error on creating consumer: %v", err) logx.Errorf("Error on creating consumer: %v", err)
time.Sleep(time.Second) time.Sleep(time.Second)
} else { } else {
@@ -122,9 +134,9 @@ func (queue *Queue) consume(eventChan chan interface{}) {
for { for {
select { select {
case message, ok := <-queue.channel: case message, ok := <-q.channel:
if ok { if ok {
queue.consumeOne(consumer, message) q.consumeOne(consumer, message)
} else { } else {
logx.Info("Task channel was closed, quitting consumer...") logx.Info("Task channel was closed, quitting consumer...")
return return
@@ -135,12 +147,12 @@ func (queue *Queue) consume(eventChan chan interface{}) {
} }
} }
func (queue *Queue) consumeOne(consumer Consumer, message string) { func (q *Queue) consumeOne(consumer Consumer, message string) {
threading.RunSafe(func() { threading.RunSafe(func() {
startTime := timex.Now() startTime := timex.Now()
defer func() { defer func() {
duration := timex.Since(startTime) duration := timex.Since(startTime)
queue.metrics.Add(stat.Task{ q.metrics.Add(stat.Task{
Duration: duration, Duration: duration,
}) })
logx.WithDuration(duration).Infof("%s", message) logx.WithDuration(duration).Infof("%s", message)
@@ -152,18 +164,18 @@ func (queue *Queue) consumeOne(consumer Consumer, message string) {
}) })
} }
func (queue *Queue) pause() { func (q *Queue) pause() {
for _, listener := range queue.listeners { for _, listener := range q.listeners {
listener.OnPause() listener.OnPause()
} }
} }
func (queue *Queue) produce() { func (q *Queue) produce() {
var producer Producer var producer Producer
for { for {
var err error var err error
if producer, err = queue.producerFactory(); err != nil { if producer, err = q.producerFactory(); err != nil {
logx.Errorf("Error on creating producer: %v", err) logx.Errorf("Error on creating producer: %v", err)
time.Sleep(time.Second) time.Sleep(time.Second)
} else { } else {
@@ -171,53 +183,53 @@ func (queue *Queue) produce() {
} }
} }
atomic.AddInt32(&queue.active, 1) atomic.AddInt32(&q.active, 1)
producer.AddListener(routineListener{ producer.AddListener(routineListener{
queue: queue, queue: q,
}) })
for { for {
select { select {
case <-queue.quit: case <-q.quit:
logx.Info("Quitting producer") logx.Info("Quitting producer")
return return
default: default:
if v, ok := queue.produceOne(producer); ok { if v, ok := q.produceOne(producer); ok {
queue.channel <- v q.channel <- v
} }
} }
} }
} }
func (queue *Queue) produceOne(producer Producer) (string, bool) { func (q *Queue) produceOne(producer Producer) (string, bool) {
// avoid panic quit the producer, just log it and continue // avoid panic quit the producer, just log it and continue
defer rescue.Recover() defer rescue.Recover()
return producer.Produce() return producer.Produce()
} }
func (queue *Queue) resume() { func (q *Queue) resume() {
for _, listener := range queue.listeners { for _, listener := range q.listeners {
listener.OnResume() listener.OnResume()
} }
} }
func (queue *Queue) startConsumers(number int) { func (q *Queue) startConsumers(number int) {
for i := 0; i < number; i++ { for i := 0; i < number; i++ {
eventChan := make(chan interface{}) eventChan := make(chan interface{})
queue.eventLock.Lock() q.eventLock.Lock()
queue.eventChannels = append(queue.eventChannels, eventChan) q.eventChannels = append(q.eventChannels, eventChan)
queue.eventLock.Unlock() q.eventLock.Unlock()
queue.consumerRoutineGroup.Run(func() { q.consumerRoutineGroup.Run(func() {
queue.consume(eventChan) q.consume(eventChan)
}) })
} }
} }
func (queue *Queue) startProducers(number int) { func (q *Queue) startProducers(number int) {
for i := 0; i < number; i++ { for i := 0; i < number; i++ {
queue.producerRoutineGroup.Run(func() { q.producerRoutineGroup.Run(func() {
queue.produce() q.produce()
}) })
} }
} }