refactor
This commit is contained in:
19
rq/internal/conf.go
Normal file
19
rq/internal/conf.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"zero/core/queue"
|
||||
"zero/core/stores/redis"
|
||||
)
|
||||
|
||||
type RedisKeyConf struct {
|
||||
redis.RedisConf
|
||||
Key string `json:",optional"`
|
||||
}
|
||||
|
||||
func (rkc RedisKeyConf) NewProducer(opts ...ProducerOption) (queue.Producer, error) {
|
||||
return newProducer(rkc.NewRedis(), rkc.Key, opts...)
|
||||
}
|
||||
|
||||
func (rkc RedisKeyConf) NewPusher(opts ...PusherOption) queue.QueuePusher {
|
||||
return NewPusher(rkc.NewRedis(), rkc.Key, opts...)
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package constant
|
||||
package internal
|
||||
|
||||
const (
|
||||
Delimeter = "/"
|
||||
@@ -1,4 +1,4 @@
|
||||
package rq
|
||||
package internal
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
6
rq/internal/message.go
Normal file
6
rq/internal/message.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package internal
|
||||
|
||||
type TimedMessage struct {
|
||||
Time int64 `json:"time"`
|
||||
Payload string `json:"payload"`
|
||||
}
|
||||
82
rq/internal/redisqueue_test.go
Normal file
82
rq/internal/redisqueue_test.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"zero/core/logx"
|
||||
"zero/core/queue"
|
||||
"zero/core/stores/redis"
|
||||
|
||||
"github.com/alicebob/miniredis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func init() {
|
||||
logx.Disable()
|
||||
}
|
||||
|
||||
func TestRedisQueue(t *testing.T) {
|
||||
const (
|
||||
total = 1000
|
||||
key = "queue"
|
||||
)
|
||||
r, err := miniredis.Run()
|
||||
assert.Nil(t, err)
|
||||
|
||||
c := RedisKeyConf{
|
||||
RedisConf: redis.RedisConf{
|
||||
Host: r.Addr(),
|
||||
Type: redis.NodeType,
|
||||
},
|
||||
Key: key,
|
||||
}
|
||||
|
||||
pusher := NewPusher(c.NewRedis(), key, WithTime())
|
||||
assert.True(t, len(pusher.Name()) > 0)
|
||||
for i := 0; i < total; i++ {
|
||||
err := pusher.Push(strconv.Itoa(i))
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
consumer := new(mockedConsumer)
|
||||
consumer.wait.Add(total)
|
||||
q := queue.NewQueue(func() (queue.Producer, error) {
|
||||
return c.NewProducer(TimeSensitive(5))
|
||||
}, func() (queue.Consumer, error) {
|
||||
return consumer, nil
|
||||
})
|
||||
q.SetNumProducer(1)
|
||||
q.SetNumConsumer(1)
|
||||
go func() {
|
||||
q.Start()
|
||||
}()
|
||||
consumer.wait.Wait()
|
||||
q.Stop()
|
||||
|
||||
var expect int
|
||||
for i := 0; i < total; i++ {
|
||||
expect ^= i
|
||||
}
|
||||
assert.Equal(t, expect, consumer.xor)
|
||||
}
|
||||
|
||||
type mockedConsumer struct {
|
||||
wait sync.WaitGroup
|
||||
xor int
|
||||
}
|
||||
|
||||
func (c *mockedConsumer) Consume(s string) error {
|
||||
val, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.xor ^= val
|
||||
c.wait.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockedConsumer) OnEvent(event interface{}) {
|
||||
}
|
||||
166
rq/internal/redisqueueproducer.go
Normal file
166
rq/internal/redisqueueproducer.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"zero/core/jsonx"
|
||||
"zero/core/logx"
|
||||
"zero/core/queue"
|
||||
"zero/core/stores/redis"
|
||||
)
|
||||
|
||||
const (
|
||||
logIntervalMillis = 1000
|
||||
retryRedisInterval = time.Second
|
||||
)
|
||||
|
||||
type (
|
||||
ProducerOption func(p queue.Producer) queue.Producer
|
||||
|
||||
RedisQueueProducer struct {
|
||||
name string
|
||||
store *redis.Redis
|
||||
key string
|
||||
redisNode redis.ClosableNode
|
||||
listeners []queue.ProduceListener
|
||||
}
|
||||
)
|
||||
|
||||
func NewProducerFactory(store *redis.Redis, key string, opts ...ProducerOption) queue.ProducerFactory {
|
||||
return func() (queue.Producer, error) {
|
||||
return newProducer(store, key, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RedisQueueProducer) AddListener(listener queue.ProduceListener) {
|
||||
p.listeners = append(p.listeners, listener)
|
||||
}
|
||||
|
||||
func (p *RedisQueueProducer) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *RedisQueueProducer) Produce() (string, bool) {
|
||||
lessLogger := logx.NewLessLogger(logIntervalMillis)
|
||||
|
||||
for {
|
||||
value, ok, err := p.store.BlpopEx(p.redisNode, p.key)
|
||||
if err == nil {
|
||||
return value, ok
|
||||
} else if err == redis.Nil {
|
||||
// timed out without elements popped
|
||||
continue
|
||||
} else {
|
||||
lessLogger.Errorf("Error on blpop: %v", err)
|
||||
p.waitForRedisAvailable()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newProducer(store *redis.Redis, key string, opts ...ProducerOption) (queue.Producer, error) {
|
||||
redisNode, err := redis.CreateBlockingNode(store)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var producer queue.Producer = &RedisQueueProducer{
|
||||
name: fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key),
|
||||
store: store,
|
||||
key: key,
|
||||
redisNode: redisNode,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
producer = opt(producer)
|
||||
}
|
||||
|
||||
return producer, nil
|
||||
}
|
||||
|
||||
func (p *RedisQueueProducer) resetRedisConnection() error {
|
||||
if p.redisNode != nil {
|
||||
p.redisNode.Close()
|
||||
p.redisNode = nil
|
||||
}
|
||||
|
||||
redisNode, err := redis.CreateBlockingNode(p.store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.redisNode = redisNode
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RedisQueueProducer) waitForRedisAvailable() {
|
||||
var paused bool
|
||||
var pauseOnce sync.Once
|
||||
|
||||
for {
|
||||
if err := p.resetRedisConnection(); err != nil {
|
||||
pauseOnce.Do(func() {
|
||||
paused = true
|
||||
for _, listener := range p.listeners {
|
||||
listener.OnProducerPause()
|
||||
}
|
||||
})
|
||||
logx.Errorf("Error occurred while connect to redis: %v", err)
|
||||
time.Sleep(retryRedisInterval)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if paused {
|
||||
for _, listener := range p.listeners {
|
||||
listener.OnProducerResume()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TimeSensitive(seconds int64) ProducerOption {
|
||||
return func(p queue.Producer) queue.Producer {
|
||||
if seconds > 0 {
|
||||
return autoDropQueueProducer{
|
||||
seconds: seconds,
|
||||
producer: p,
|
||||
}
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
}
|
||||
|
||||
type autoDropQueueProducer struct {
|
||||
seconds int64 // seconds before to drop
|
||||
producer queue.Producer
|
||||
}
|
||||
|
||||
func (p autoDropQueueProducer) AddListener(listener queue.ProduceListener) {
|
||||
p.producer.AddListener(listener)
|
||||
}
|
||||
|
||||
func (p autoDropQueueProducer) Produce() (string, bool) {
|
||||
lessLogger := logx.NewLessLogger(logIntervalMillis)
|
||||
|
||||
for {
|
||||
content, ok := p.producer.Produce()
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
|
||||
var timedMsg TimedMessage
|
||||
if err := jsonx.UnmarshalFromString(content, &timedMsg); err != nil {
|
||||
lessLogger.Errorf("invalid timedMessage: %s, error: %s", content, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
if timedMsg.Time+p.seconds < time.Now().Unix() {
|
||||
lessLogger.Errorf("expired timedMessage: %s", content)
|
||||
}
|
||||
|
||||
return timedMsg.Payload, true
|
||||
}
|
||||
}
|
||||
78
rq/internal/redisqueuepusher.go
Normal file
78
rq/internal/redisqueuepusher.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"zero/core/jsonx"
|
||||
"zero/core/logx"
|
||||
"zero/core/queue"
|
||||
"zero/core/stores/redis"
|
||||
)
|
||||
|
||||
type (
|
||||
PusherOption func(p queue.QueuePusher) queue.QueuePusher
|
||||
|
||||
RedisQueuePusher struct {
|
||||
name string
|
||||
store *redis.Redis
|
||||
key string
|
||||
}
|
||||
)
|
||||
|
||||
func NewPusher(store *redis.Redis, key string, opts ...PusherOption) queue.QueuePusher {
|
||||
var pusher queue.QueuePusher = &RedisQueuePusher{
|
||||
name: fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key),
|
||||
store: store,
|
||||
key: key,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
pusher = opt(pusher)
|
||||
}
|
||||
|
||||
return pusher
|
||||
}
|
||||
|
||||
func (saver *RedisQueuePusher) Name() string {
|
||||
return saver.name
|
||||
}
|
||||
|
||||
func (saver *RedisQueuePusher) Push(message string) error {
|
||||
_, err := saver.store.Rpush(saver.key, message)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
logx.Infof("<= %s", message)
|
||||
return nil
|
||||
}
|
||||
|
||||
func WithTime() PusherOption {
|
||||
return func(p queue.QueuePusher) queue.QueuePusher {
|
||||
return timedQueuePusher{
|
||||
pusher: p,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type timedQueuePusher struct {
|
||||
pusher queue.QueuePusher
|
||||
}
|
||||
|
||||
func (p timedQueuePusher) Name() string {
|
||||
return p.pusher.Name()
|
||||
}
|
||||
|
||||
func (p timedQueuePusher) Push(message string) error {
|
||||
tm := TimedMessage{
|
||||
Time: time.Now().Unix(),
|
||||
Payload: message,
|
||||
}
|
||||
|
||||
if content, err := jsonx.Marshal(tm); err != nil {
|
||||
return err
|
||||
} else {
|
||||
return p.pusher.Push(string(content))
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"zero/core/hash"
|
||||
"zero/core/jsonx"
|
||||
"zero/rq/constant"
|
||||
"zero/rq/internal"
|
||||
)
|
||||
|
||||
var ErrInvalidServerChange = errors.New("not a server change message")
|
||||
@@ -82,7 +82,7 @@ func (sc ServerChange) GetCode() string {
|
||||
}
|
||||
|
||||
func IsServerChange(message string) bool {
|
||||
return len(message) > 0 && message[0] == constant.ServerSensitivePrefix
|
||||
return len(message) > 0 && message[0] == internal.ServerSensitivePrefix
|
||||
}
|
||||
|
||||
func (sc ServerChange) Marshal() (string, error) {
|
||||
@@ -91,7 +91,7 @@ func (sc ServerChange) Marshal() (string, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(append([]byte{constant.ServerSensitivePrefix}, body...)), nil
|
||||
return string(append([]byte{internal.ServerSensitivePrefix}, body...)), nil
|
||||
}
|
||||
|
||||
func UnmarshalServerChange(body string) (ServerChange, error) {
|
||||
23
rq/pusher.go
23
rq/pusher.go
@@ -14,11 +14,10 @@ import (
|
||||
"zero/core/lang"
|
||||
"zero/core/logx"
|
||||
"zero/core/queue"
|
||||
"zero/core/redisqueue"
|
||||
"zero/core/stores/redis"
|
||||
"zero/core/threading"
|
||||
"zero/rq/constant"
|
||||
"zero/rq/update"
|
||||
"zero/rq/internal"
|
||||
"zero/rq/internal/update"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -155,7 +154,7 @@ func (pusher *Pusher) failover(server string) error {
|
||||
return
|
||||
}
|
||||
|
||||
if option == constant.TimedQueueType {
|
||||
if option == internal.TimedQueueType {
|
||||
message, err = unwrapTimedMessage(message)
|
||||
if err != nil {
|
||||
logx.Errorf("invalid timedMessage: %s, error: %s", message, err.Error())
|
||||
@@ -179,11 +178,11 @@ func UnmarshalPusher(server string) (queue.QueuePusher, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if option == constant.TimedQueueType {
|
||||
return redisqueue.NewPusher(store, key, redisqueue.WithTime()), nil
|
||||
if option == internal.TimedQueueType {
|
||||
return internal.NewPusher(store, key, internal.WithTime()), nil
|
||||
}
|
||||
|
||||
return redisqueue.NewPusher(store, key), nil
|
||||
return internal.NewPusher(store, key), nil
|
||||
}
|
||||
|
||||
func WithBatchConsistentStrategy(keysFn KeysFn, assembleFn AssembleFn, opts ...discov.BalanceOption) PusherOption {
|
||||
@@ -375,15 +374,15 @@ func getName(key string) string {
|
||||
func newPusher(server string) (queue.QueuePusher, error) {
|
||||
if rds, key, option, err := newRedisWithKey(server); err != nil {
|
||||
return nil, err
|
||||
} else if option == constant.TimedQueueType {
|
||||
return redisqueue.NewPusher(rds, key, redisqueue.WithTime()), nil
|
||||
} else if option == internal.TimedQueueType {
|
||||
return internal.NewPusher(rds, key, internal.WithTime()), nil
|
||||
} else {
|
||||
return redisqueue.NewPusher(rds, key), nil
|
||||
return internal.NewPusher(rds, key), nil
|
||||
}
|
||||
}
|
||||
|
||||
func newRedisWithKey(server string) (rds *redis.Redis, key, option string, err error) {
|
||||
fields := strings.Split(server, constant.Delimeter)
|
||||
fields := strings.Split(server, internal.Delimeter)
|
||||
if len(fields) < etcdRedisFields {
|
||||
err = fmt.Errorf("wrong redis queue: %s, should be ip:port/type/password/key/[option]", server)
|
||||
return
|
||||
@@ -437,7 +436,7 @@ func broadcast(servers []string, message string) error {
|
||||
}
|
||||
|
||||
func unwrapTimedMessage(message string) (string, error) {
|
||||
var tm redisqueue.TimedMessage
|
||||
var tm internal.TimedMessage
|
||||
if err := jsonx.UnmarshalFromString(message, &tm); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
15
rq/queue.go
15
rq/queue.go
@@ -11,13 +11,12 @@ import (
|
||||
"zero/core/discov"
|
||||
"zero/core/logx"
|
||||
"zero/core/queue"
|
||||
"zero/core/redisqueue"
|
||||
"zero/core/service"
|
||||
"zero/core/stores/redis"
|
||||
"zero/core/stringx"
|
||||
"zero/core/threading"
|
||||
"zero/rq/constant"
|
||||
"zero/rq/update"
|
||||
"zero/rq/internal"
|
||||
"zero/rq/internal/update"
|
||||
)
|
||||
|
||||
const keyLen = 6
|
||||
@@ -107,8 +106,8 @@ func (q *MessageQueue) Stop() {
|
||||
|
||||
func (q *MessageQueue) buildQueue() *queue.Queue {
|
||||
inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass)
|
||||
producerFactory := redisqueue.NewProducerFactory(inboundStore, q.c.Redis.Key,
|
||||
redisqueue.TimeSensitive(q.c.DropBefore))
|
||||
producerFactory := internal.NewProducerFactory(inboundStore, q.c.Redis.Key,
|
||||
internal.TimeSensitive(q.c.DropBefore))
|
||||
mq := queue.NewQueue(producerFactory, q.consumerFactory)
|
||||
|
||||
if len(q.c.Name) > 0 {
|
||||
@@ -140,7 +139,7 @@ func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue
|
||||
if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 {
|
||||
etcdValue := MarshalQueue(q.c.Redis)
|
||||
if q.c.DropBefore > 0 {
|
||||
etcdValue = strings.Join([]string{etcdValue, constant.TimedQueueType}, constant.Delimeter)
|
||||
etcdValue = strings.Join([]string{etcdValue, internal.TimedQueueType}, internal.Delimeter)
|
||||
}
|
||||
keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId)
|
||||
mq.AddListener(pauseResumeHandler{
|
||||
@@ -156,7 +155,7 @@ func MarshalQueue(rds redis.RedisKeyConf) string {
|
||||
rds.Type,
|
||||
rds.Pass,
|
||||
rds.Key,
|
||||
}, constant.Delimeter)
|
||||
}, internal.Delimeter)
|
||||
}
|
||||
|
||||
func WithHandle(handle ConsumeHandle) queue.ConsumerFactory {
|
||||
@@ -251,7 +250,7 @@ func (c *serverSensitiveConsumer) Consume(msg string) error {
|
||||
|
||||
oldHash := change.CreatePrevHash()
|
||||
newHash := change.CreateCurrentHash()
|
||||
hashChange := NewHashChange(oldHash, newHash)
|
||||
hashChange := internal.NewHashChange(oldHash, newHash)
|
||||
c.mq.redisQueue.Broadcast(hashChange)
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user