initial import
This commit is contained in:
21
kq/config.go
Normal file
21
kq/config.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package kq
|
||||
|
||||
import "zero/core/service"
|
||||
|
||||
const (
|
||||
firstOffset = "first"
|
||||
lastOffset = "last"
|
||||
)
|
||||
|
||||
type KqConf struct {
|
||||
service.ServiceConf
|
||||
Brokers []string
|
||||
Group string
|
||||
Topic string
|
||||
Offset string `json:",options=first|last,default=last"`
|
||||
NumConns int `json:",default=1"`
|
||||
NumProducers int `json:",default=8"`
|
||||
NumConsumers int `json:",default=8"`
|
||||
MinBytes int `json:",default=10240"` // 10K
|
||||
MaxBytes int `json:",default=10485760"` // 10M
|
||||
}
|
||||
101
kq/pusher.go
Normal file
101
kq/pusher.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package kq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"zero/core/executors"
|
||||
"zero/core/logx"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
"github.com/segmentio/kafka-go/snappy"
|
||||
)
|
||||
|
||||
type (
|
||||
PushOption func(options *chunkOptions)
|
||||
|
||||
Pusher struct {
|
||||
produer *kafka.Writer
|
||||
topic string
|
||||
executor *executors.ChunkExecutor
|
||||
}
|
||||
|
||||
chunkOptions struct {
|
||||
chunkSize int
|
||||
flushInterval time.Duration
|
||||
}
|
||||
)
|
||||
|
||||
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
|
||||
producer := kafka.NewWriter(kafka.WriterConfig{
|
||||
Brokers: addrs,
|
||||
Topic: topic,
|
||||
Balancer: &kafka.LeastBytes{},
|
||||
CompressionCodec: snappy.NewCompressionCodec(),
|
||||
})
|
||||
|
||||
pusher := &Pusher{
|
||||
produer: producer,
|
||||
topic: topic,
|
||||
}
|
||||
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
|
||||
chunk := make([]kafka.Message, len(tasks))
|
||||
for i := range tasks {
|
||||
chunk[i] = tasks[i].(kafka.Message)
|
||||
}
|
||||
if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
|
||||
logx.Error(err)
|
||||
}
|
||||
}, newOptions(opts)...)
|
||||
|
||||
return pusher
|
||||
}
|
||||
|
||||
func (p *Pusher) Close() error {
|
||||
return p.produer.Close()
|
||||
}
|
||||
|
||||
func (p *Pusher) Name() string {
|
||||
return p.topic
|
||||
}
|
||||
|
||||
func (p *Pusher) Push(v string) error {
|
||||
msg := kafka.Message{
|
||||
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
|
||||
Value: []byte(v),
|
||||
}
|
||||
if p.executor != nil {
|
||||
return p.executor.Add(msg, len(v))
|
||||
} else {
|
||||
return p.produer.WriteMessages(context.Background(), msg)
|
||||
}
|
||||
}
|
||||
|
||||
func WithChunkSize(chunkSize int) PushOption {
|
||||
return func(options *chunkOptions) {
|
||||
options.chunkSize = chunkSize
|
||||
}
|
||||
}
|
||||
|
||||
func WithFlushInterval(interval time.Duration) PushOption {
|
||||
return func(options *chunkOptions) {
|
||||
options.flushInterval = interval
|
||||
}
|
||||
}
|
||||
|
||||
func newOptions(opts []PushOption) []executors.ChunkOption {
|
||||
var options chunkOptions
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
|
||||
var chunkOpts []executors.ChunkOption
|
||||
if options.chunkSize > 0 {
|
||||
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
|
||||
}
|
||||
if options.flushInterval > 0 {
|
||||
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
|
||||
}
|
||||
return chunkOpts
|
||||
}
|
||||
229
kq/queue.go
Normal file
229
kq/queue.go
Normal file
@@ -0,0 +1,229 @@
|
||||
package kq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"zero/core/logx"
|
||||
"zero/core/queue"
|
||||
"zero/core/service"
|
||||
"zero/core/stat"
|
||||
"zero/core/threading"
|
||||
"zero/core/timex"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
_ "github.com/segmentio/kafka-go/gzip"
|
||||
_ "github.com/segmentio/kafka-go/lz4"
|
||||
_ "github.com/segmentio/kafka-go/snappy"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultCommitInterval = time.Second
|
||||
defaultMaxWait = time.Second
|
||||
)
|
||||
|
||||
type (
|
||||
ConsumeHandle func(key, value string) error
|
||||
|
||||
ConsumeHandler interface {
|
||||
Consume(key, value string) error
|
||||
}
|
||||
|
||||
queueOptions struct {
|
||||
commitInterval time.Duration
|
||||
maxWait time.Duration
|
||||
metrics *stat.Metrics
|
||||
}
|
||||
|
||||
QueueOption func(*queueOptions)
|
||||
|
||||
kafkaQueue struct {
|
||||
c KqConf
|
||||
consumer *kafka.Reader
|
||||
handler ConsumeHandler
|
||||
channel chan kafka.Message
|
||||
producerRoutines *threading.RoutineGroup
|
||||
consumerRoutines *threading.RoutineGroup
|
||||
metrics *stat.Metrics
|
||||
}
|
||||
|
||||
kafkaQueues struct {
|
||||
queues []queue.MessageQueue
|
||||
group *service.ServiceGroup
|
||||
}
|
||||
)
|
||||
|
||||
func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
|
||||
q, err := NewQueue(c, handler, opts...)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
|
||||
if err := c.SetUp(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var options queueOptions
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
ensureQueueOptions(c, &options)
|
||||
|
||||
if c.NumConns < 1 {
|
||||
c.NumConns = 1
|
||||
}
|
||||
q := kafkaQueues{
|
||||
group: service.NewServiceGroup(),
|
||||
}
|
||||
for i := 0; i < c.NumConns; i++ {
|
||||
q.queues = append(q.queues, newKafkaQueue(c, handler, options))
|
||||
}
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
|
||||
var offset int64
|
||||
if c.Offset == firstOffset {
|
||||
offset = kafka.FirstOffset
|
||||
} else {
|
||||
offset = kafka.LastOffset
|
||||
}
|
||||
consumer := kafka.NewReader(kafka.ReaderConfig{
|
||||
Brokers: c.Brokers,
|
||||
GroupID: c.Group,
|
||||
Topic: c.Topic,
|
||||
StartOffset: offset,
|
||||
MinBytes: c.MinBytes, // 10KB
|
||||
MaxBytes: c.MaxBytes, // 10MB
|
||||
MaxWait: options.maxWait,
|
||||
CommitInterval: options.commitInterval,
|
||||
})
|
||||
|
||||
return &kafkaQueue{
|
||||
c: c,
|
||||
consumer: consumer,
|
||||
handler: handler,
|
||||
channel: make(chan kafka.Message),
|
||||
producerRoutines: threading.NewRoutineGroup(),
|
||||
consumerRoutines: threading.NewRoutineGroup(),
|
||||
metrics: options.metrics,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *kafkaQueue) Start() {
|
||||
q.startConsumers()
|
||||
q.startProducers()
|
||||
|
||||
q.producerRoutines.Wait()
|
||||
close(q.channel)
|
||||
q.consumerRoutines.Wait()
|
||||
}
|
||||
|
||||
func (q *kafkaQueue) Stop() {
|
||||
q.consumer.Close()
|
||||
logx.Close()
|
||||
}
|
||||
|
||||
func (q *kafkaQueue) consumeOne(key, val string) error {
|
||||
startTime := timex.Now()
|
||||
err := q.handler.Consume(key, val)
|
||||
q.metrics.Add(stat.Task{
|
||||
Duration: timex.Since(startTime),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *kafkaQueue) startConsumers() {
|
||||
for i := 0; i < q.c.NumConsumers; i++ {
|
||||
q.consumerRoutines.Run(func() {
|
||||
for msg := range q.channel {
|
||||
if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
|
||||
logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (q *kafkaQueue) startProducers() {
|
||||
for i := 0; i < q.c.NumProducers; i++ {
|
||||
q.producerRoutines.Run(func() {
|
||||
for {
|
||||
msg, err := q.consumer.ReadMessage(context.Background())
|
||||
// io.EOF means consumer closed
|
||||
// io.ErrClosedPipe means committing messages on the consumer,
|
||||
// kafka will refire the messages on uncommitted messages, ignore
|
||||
if err == io.EOF || err == io.ErrClosedPipe {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logx.Errorf("Error on reading mesage, %q", err.Error())
|
||||
continue
|
||||
}
|
||||
q.channel <- msg
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (q kafkaQueues) Start() {
|
||||
for _, each := range q.queues {
|
||||
q.group.Add(each)
|
||||
}
|
||||
q.group.Start()
|
||||
}
|
||||
|
||||
func (q kafkaQueues) Stop() {
|
||||
q.group.Stop()
|
||||
}
|
||||
|
||||
func WithCommitInterval(interval time.Duration) QueueOption {
|
||||
return func(options *queueOptions) {
|
||||
options.commitInterval = interval
|
||||
}
|
||||
}
|
||||
|
||||
func WithHandle(handle ConsumeHandle) ConsumeHandler {
|
||||
return innerConsumeHandler{
|
||||
handle: handle,
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxWait(wait time.Duration) QueueOption {
|
||||
return func(options *queueOptions) {
|
||||
options.maxWait = wait
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetrics(metrics *stat.Metrics) QueueOption {
|
||||
return func(options *queueOptions) {
|
||||
options.metrics = metrics
|
||||
}
|
||||
}
|
||||
|
||||
type innerConsumeHandler struct {
|
||||
handle ConsumeHandle
|
||||
}
|
||||
|
||||
func (ch innerConsumeHandler) Consume(k, v string) error {
|
||||
return ch.handle(k, v)
|
||||
}
|
||||
|
||||
func ensureQueueOptions(c KqConf, options *queueOptions) {
|
||||
if options.commitInterval == 0 {
|
||||
options.commitInterval = defaultCommitInterval
|
||||
}
|
||||
if options.maxWait == 0 {
|
||||
options.maxWait = defaultMaxWait
|
||||
}
|
||||
if options.metrics == nil {
|
||||
options.metrics = stat.NewMetrics(c.Name)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user