Compare commits
35 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fdee77fa9 | ||
|
|
5f084fb7d2 | ||
|
|
c8ff9d2f23 | ||
|
|
78f5e7df87 | ||
|
|
6d8dc4630f | ||
|
|
03ac41438f | ||
|
|
4ef0b0a8ac | ||
|
|
87b1fba46c | ||
|
|
cfa6644b0c | ||
|
|
fcaebd73fb | ||
|
|
a26fc2b672 | ||
|
|
05c8dd0b9c | ||
|
|
d6c7da521e | ||
|
|
96b6d2ab58 | ||
|
|
88a73f1042 | ||
|
|
9428fface2 | ||
|
|
c637f86817 | ||
|
|
d4097af627 | ||
|
|
7da31921c7 | ||
|
|
47440964cd | ||
|
|
80d55dbc02 | ||
|
|
b541403ce2 | ||
|
|
a7c02414f3 | ||
|
|
196475383b | ||
|
|
fd75f700a2 | ||
|
|
d408a0d49b | ||
|
|
69d113a46d | ||
|
|
63c7f44a5f | ||
|
|
19888b7d11 | ||
|
|
d117e31993 | ||
|
|
10cd6053bc | ||
|
|
d1529fced8 | ||
|
|
4f59fd306a | ||
|
|
f77c73eec1 | ||
|
|
9b0b958f43 |
4
.github/workflows/go.yml
vendored
4
.github/workflows/go.yml
vendored
@@ -25,10 +25,6 @@ jobs:
|
||||
- name: Get dependencies
|
||||
run: |
|
||||
go get -v -t -d ./...
|
||||
if [ -f Gopkg.toml ]; then
|
||||
curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
|
||||
dep ensure
|
||||
fi
|
||||
|
||||
- name: Test
|
||||
run: go test -v -race ./...
|
||||
|
||||
44
core/queue/balancedqueuepusher.go
Normal file
44
core/queue/balancedqueuepusher.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
)
|
||||
|
||||
var ErrNoAvailablePusher = errors.New("no available pusher")
|
||||
|
||||
type BalancedQueuePusher struct {
|
||||
name string
|
||||
pushers []Pusher
|
||||
index uint64
|
||||
}
|
||||
|
||||
func NewBalancedQueuePusher(pushers []Pusher) Pusher {
|
||||
return &BalancedQueuePusher{
|
||||
name: generateName(pushers),
|
||||
pushers: pushers,
|
||||
}
|
||||
}
|
||||
|
||||
func (pusher *BalancedQueuePusher) Name() string {
|
||||
return pusher.name
|
||||
}
|
||||
|
||||
func (pusher *BalancedQueuePusher) Push(message string) error {
|
||||
size := len(pusher.pushers)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
|
||||
target := pusher.pushers[index]
|
||||
|
||||
if err := target.Push(message); err != nil {
|
||||
logx.Error(err)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return ErrNoAvailablePusher
|
||||
}
|
||||
43
core/queue/balancedqueuepusher_test.go
Normal file
43
core/queue/balancedqueuepusher_test.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestBalancedQueuePusher(t *testing.T) {
|
||||
const numPushers = 100
|
||||
var pushers []Pusher
|
||||
var mockedPushers []*mockedPusher
|
||||
for i := 0; i < numPushers; i++ {
|
||||
p := &mockedPusher{
|
||||
name: "pusher:" + strconv.Itoa(i),
|
||||
}
|
||||
pushers = append(pushers, p)
|
||||
mockedPushers = append(mockedPushers, p)
|
||||
}
|
||||
|
||||
pusher := NewBalancedQueuePusher(pushers)
|
||||
assert.True(t, len(pusher.Name()) > 0)
|
||||
|
||||
for i := 0; i < numPushers*1000; i++ {
|
||||
assert.Nil(t, pusher.Push("item"))
|
||||
}
|
||||
|
||||
var counts []int
|
||||
for _, p := range mockedPushers {
|
||||
counts = append(counts, p.count)
|
||||
}
|
||||
mean := calcMean(counts)
|
||||
variance := calcVariance(mean, counts)
|
||||
assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
|
||||
}
|
||||
|
||||
func TestBalancedQueuePusher_NoAvailable(t *testing.T) {
|
||||
pusher := NewBalancedQueuePusher(nil)
|
||||
assert.True(t, len(pusher.Name()) == 0)
|
||||
assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item"))
|
||||
}
|
||||
10
core/queue/consumer.go
Normal file
10
core/queue/consumer.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package queue
|
||||
|
||||
type (
|
||||
Consumer interface {
|
||||
Consume(string) error
|
||||
OnEvent(event interface{})
|
||||
}
|
||||
|
||||
ConsumerFactory func() (Consumer, error)
|
||||
)
|
||||
6
core/queue/messagequeue.go
Normal file
6
core/queue/messagequeue.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package queue
|
||||
|
||||
type MessageQueue interface {
|
||||
Start()
|
||||
Stop()
|
||||
}
|
||||
31
core/queue/multiqueuepusher.go
Normal file
31
core/queue/multiqueuepusher.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package queue
|
||||
|
||||
import "github.com/tal-tech/go-zero/core/errorx"
|
||||
|
||||
type MultiQueuePusher struct {
|
||||
name string
|
||||
pushers []Pusher
|
||||
}
|
||||
|
||||
func NewMultiQueuePusher(pushers []Pusher) Pusher {
|
||||
return &MultiQueuePusher{
|
||||
name: generateName(pushers),
|
||||
pushers: pushers,
|
||||
}
|
||||
}
|
||||
|
||||
func (pusher *MultiQueuePusher) Name() string {
|
||||
return pusher.name
|
||||
}
|
||||
|
||||
func (pusher *MultiQueuePusher) Push(message string) error {
|
||||
var batchError errorx.BatchError
|
||||
|
||||
for _, each := range pusher.pushers {
|
||||
if err := each.Push(message); err != nil {
|
||||
batchError.Add(err)
|
||||
}
|
||||
}
|
||||
|
||||
return batchError.Err()
|
||||
}
|
||||
39
core/queue/multiqueuepusher_test.go
Normal file
39
core/queue/multiqueuepusher_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMultiQueuePusher(t *testing.T) {
|
||||
const numPushers = 100
|
||||
var pushers []Pusher
|
||||
var mockedPushers []*mockedPusher
|
||||
for i := 0; i < numPushers; i++ {
|
||||
p := &mockedPusher{
|
||||
name: "pusher:" + strconv.Itoa(i),
|
||||
}
|
||||
pushers = append(pushers, p)
|
||||
mockedPushers = append(mockedPushers, p)
|
||||
}
|
||||
|
||||
pusher := NewMultiQueuePusher(pushers)
|
||||
assert.True(t, len(pusher.Name()) > 0)
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
_ = pusher.Push("item")
|
||||
}
|
||||
|
||||
var counts []int
|
||||
for _, p := range mockedPushers {
|
||||
counts = append(counts, p.count)
|
||||
}
|
||||
mean := calcMean(counts)
|
||||
variance := calcVariance(mean, counts)
|
||||
assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10)
|
||||
assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
|
||||
}
|
||||
15
core/queue/producer.go
Normal file
15
core/queue/producer.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package queue
|
||||
|
||||
type (
|
||||
Producer interface {
|
||||
AddListener(listener ProduceListener)
|
||||
Produce() (string, bool)
|
||||
}
|
||||
|
||||
ProduceListener interface {
|
||||
OnProducerPause()
|
||||
OnProducerResume()
|
||||
}
|
||||
|
||||
ProducerFactory func() (Producer, error)
|
||||
)
|
||||
239
core/queue/queue.go
Normal file
239
core/queue/queue.go
Normal file
@@ -0,0 +1,239 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/rescue"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const queueName = "queue"
|
||||
|
||||
type (
|
||||
Queue struct {
|
||||
name string
|
||||
metrics *stat.Metrics
|
||||
producerFactory ProducerFactory
|
||||
producerRoutineGroup *threading.RoutineGroup
|
||||
consumerFactory ConsumerFactory
|
||||
consumerRoutineGroup *threading.RoutineGroup
|
||||
producerCount int
|
||||
consumerCount int
|
||||
active int32
|
||||
channel chan string
|
||||
quit chan struct{}
|
||||
listeners []Listener
|
||||
eventLock sync.Mutex
|
||||
eventChannels []chan interface{}
|
||||
}
|
||||
|
||||
Listener interface {
|
||||
OnPause()
|
||||
OnResume()
|
||||
}
|
||||
|
||||
Poller interface {
|
||||
Name() string
|
||||
Poll() string
|
||||
}
|
||||
|
||||
Pusher interface {
|
||||
Name() string
|
||||
Push(string) error
|
||||
}
|
||||
)
|
||||
|
||||
func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
|
||||
queue := &Queue{
|
||||
metrics: stat.NewMetrics(queueName),
|
||||
producerFactory: producerFactory,
|
||||
producerRoutineGroup: threading.NewRoutineGroup(),
|
||||
consumerFactory: consumerFactory,
|
||||
consumerRoutineGroup: threading.NewRoutineGroup(),
|
||||
producerCount: runtime.NumCPU(),
|
||||
consumerCount: runtime.NumCPU() << 1,
|
||||
channel: make(chan string),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
queue.SetName(queueName)
|
||||
|
||||
return queue
|
||||
}
|
||||
|
||||
func (queue *Queue) AddListener(listener Listener) {
|
||||
queue.listeners = append(queue.listeners, listener)
|
||||
}
|
||||
|
||||
func (queue *Queue) Broadcast(message interface{}) {
|
||||
go func() {
|
||||
queue.eventLock.Lock()
|
||||
defer queue.eventLock.Unlock()
|
||||
|
||||
for _, channel := range queue.eventChannels {
|
||||
channel <- message
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (queue *Queue) SetName(name string) {
|
||||
queue.name = name
|
||||
queue.metrics.SetName(name)
|
||||
}
|
||||
|
||||
func (queue *Queue) SetNumConsumer(count int) {
|
||||
queue.consumerCount = count
|
||||
}
|
||||
|
||||
func (queue *Queue) SetNumProducer(count int) {
|
||||
queue.producerCount = count
|
||||
}
|
||||
|
||||
func (queue *Queue) Start() {
|
||||
queue.startProducers(queue.producerCount)
|
||||
queue.startConsumers(queue.consumerCount)
|
||||
|
||||
queue.producerRoutineGroup.Wait()
|
||||
close(queue.channel)
|
||||
queue.consumerRoutineGroup.Wait()
|
||||
}
|
||||
|
||||
func (queue *Queue) Stop() {
|
||||
close(queue.quit)
|
||||
}
|
||||
|
||||
func (queue *Queue) consume(eventChan chan interface{}) {
|
||||
var consumer Consumer
|
||||
|
||||
for {
|
||||
var err error
|
||||
if consumer, err = queue.consumerFactory(); err != nil {
|
||||
logx.Errorf("Error on creating consumer: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-queue.channel:
|
||||
if ok {
|
||||
queue.consumeOne(consumer, message)
|
||||
} else {
|
||||
logx.Info("Task channel was closed, quitting consumer...")
|
||||
return
|
||||
}
|
||||
case event := <-eventChan:
|
||||
consumer.OnEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (queue *Queue) consumeOne(consumer Consumer, message string) {
|
||||
threading.RunSafe(func() {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
duration := timex.Since(startTime)
|
||||
queue.metrics.Add(stat.Task{
|
||||
Duration: duration,
|
||||
})
|
||||
logx.WithDuration(duration).Infof("%s", message)
|
||||
}()
|
||||
|
||||
if err := consumer.Consume(message); err != nil {
|
||||
logx.Errorf("Error occurred while consuming %v: %v", message, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (queue *Queue) pause() {
|
||||
for _, listener := range queue.listeners {
|
||||
listener.OnPause()
|
||||
}
|
||||
}
|
||||
|
||||
func (queue *Queue) produce() {
|
||||
var producer Producer
|
||||
|
||||
for {
|
||||
var err error
|
||||
if producer, err = queue.producerFactory(); err != nil {
|
||||
logx.Errorf("Error on creating producer: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
atomic.AddInt32(&queue.active, 1)
|
||||
producer.AddListener(routineListener{
|
||||
queue: queue,
|
||||
})
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-queue.quit:
|
||||
logx.Info("Quitting producer")
|
||||
return
|
||||
default:
|
||||
if v, ok := queue.produceOne(producer); ok {
|
||||
queue.channel <- v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (queue *Queue) produceOne(producer Producer) (string, bool) {
|
||||
// avoid panic quit the producer, just log it and continue
|
||||
defer rescue.Recover()
|
||||
|
||||
return producer.Produce()
|
||||
}
|
||||
|
||||
func (queue *Queue) resume() {
|
||||
for _, listener := range queue.listeners {
|
||||
listener.OnResume()
|
||||
}
|
||||
}
|
||||
|
||||
func (queue *Queue) startConsumers(number int) {
|
||||
for i := 0; i < number; i++ {
|
||||
eventChan := make(chan interface{})
|
||||
queue.eventLock.Lock()
|
||||
queue.eventChannels = append(queue.eventChannels, eventChan)
|
||||
queue.eventLock.Unlock()
|
||||
queue.consumerRoutineGroup.Run(func() {
|
||||
queue.consume(eventChan)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (queue *Queue) startProducers(number int) {
|
||||
for i := 0; i < number; i++ {
|
||||
queue.producerRoutineGroup.Run(func() {
|
||||
queue.produce()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type routineListener struct {
|
||||
queue *Queue
|
||||
}
|
||||
|
||||
func (rl routineListener) OnProducerPause() {
|
||||
if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
|
||||
rl.queue.pause()
|
||||
}
|
||||
}
|
||||
|
||||
func (rl routineListener) OnProducerResume() {
|
||||
if atomic.AddInt32(&rl.queue.active, 1) == 1 {
|
||||
rl.queue.resume()
|
||||
}
|
||||
}
|
||||
94
core/queue/queue_test.go
Normal file
94
core/queue/queue_test.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
consumers = 4
|
||||
rounds = 100
|
||||
)
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
producer := newMockedProducer(rounds)
|
||||
consumer := newMockedConsumer()
|
||||
consumer.wait.Add(consumers)
|
||||
q := NewQueue(func() (Producer, error) {
|
||||
return producer, nil
|
||||
}, func() (Consumer, error) {
|
||||
return consumer, nil
|
||||
})
|
||||
q.AddListener(new(mockedListener))
|
||||
q.SetName("mockqueue")
|
||||
q.SetNumConsumer(consumers)
|
||||
q.SetNumProducer(1)
|
||||
q.pause()
|
||||
q.resume()
|
||||
go func() {
|
||||
producer.wait.Wait()
|
||||
q.Stop()
|
||||
}()
|
||||
q.Start()
|
||||
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
|
||||
}
|
||||
|
||||
type mockedConsumer struct {
|
||||
count int32
|
||||
events int32
|
||||
wait sync.WaitGroup
|
||||
}
|
||||
|
||||
func newMockedConsumer() *mockedConsumer {
|
||||
return new(mockedConsumer)
|
||||
}
|
||||
|
||||
func (c *mockedConsumer) Consume(string) error {
|
||||
atomic.AddInt32(&c.count, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockedConsumer) OnEvent(interface{}) {
|
||||
if atomic.AddInt32(&c.events, 1) <= consumers {
|
||||
c.wait.Done()
|
||||
}
|
||||
}
|
||||
|
||||
type mockedProducer struct {
|
||||
total int32
|
||||
count int32
|
||||
wait sync.WaitGroup
|
||||
}
|
||||
|
||||
func newMockedProducer(total int32) *mockedProducer {
|
||||
p := new(mockedProducer)
|
||||
p.total = total
|
||||
p.wait.Add(int(total))
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *mockedProducer) AddListener(listener ProduceListener) {
|
||||
}
|
||||
|
||||
func (p *mockedProducer) Produce() (string, bool) {
|
||||
if atomic.AddInt32(&p.count, 1) <= p.total {
|
||||
p.wait.Done()
|
||||
return "item", true
|
||||
} else {
|
||||
time.Sleep(time.Second)
|
||||
return "", false
|
||||
}
|
||||
}
|
||||
|
||||
type mockedListener struct {
|
||||
}
|
||||
|
||||
func (l *mockedListener) OnPause() {
|
||||
}
|
||||
|
||||
func (l *mockedListener) OnResume() {
|
||||
}
|
||||
12
core/queue/util.go
Normal file
12
core/queue/util.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package queue
|
||||
|
||||
import "strings"
|
||||
|
||||
func generateName(pushers []Pusher) string {
|
||||
names := make([]string, len(pushers))
|
||||
for i, pusher := range pushers {
|
||||
names[i] = pusher.Name()
|
||||
}
|
||||
|
||||
return strings.Join(names, ",")
|
||||
}
|
||||
77
core/queue/util_test.go
Normal file
77
core/queue/util_test.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
)
|
||||
|
||||
var (
|
||||
proba = mathx.NewProba()
|
||||
failProba = 0.01
|
||||
)
|
||||
|
||||
func init() {
|
||||
logx.Disable()
|
||||
}
|
||||
|
||||
func TestGenerateName(t *testing.T) {
|
||||
pushers := []Pusher{
|
||||
&mockedPusher{name: "first"},
|
||||
&mockedPusher{name: "second"},
|
||||
&mockedPusher{name: "third"},
|
||||
}
|
||||
|
||||
assert.Equal(t, "first,second,third", generateName(pushers))
|
||||
}
|
||||
|
||||
func TestGenerateNameNil(t *testing.T) {
|
||||
var pushers []Pusher
|
||||
assert.Equal(t, "", generateName(pushers))
|
||||
}
|
||||
|
||||
func calcMean(vals []int) float64 {
|
||||
if len(vals) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
var result float64
|
||||
for _, val := range vals {
|
||||
result += float64(val)
|
||||
}
|
||||
return result / float64(len(vals))
|
||||
}
|
||||
|
||||
func calcVariance(mean float64, vals []int) float64 {
|
||||
if len(vals) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
var result float64
|
||||
for _, val := range vals {
|
||||
result += math.Pow(float64(val)-mean, 2)
|
||||
}
|
||||
return result / float64(len(vals))
|
||||
}
|
||||
|
||||
type mockedPusher struct {
|
||||
name string
|
||||
count int
|
||||
}
|
||||
|
||||
func (p *mockedPusher) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *mockedPusher) Push(s string) error {
|
||||
if proba.TrueOnProba(failProba) {
|
||||
return errors.New("dummy")
|
||||
}
|
||||
|
||||
p.count++
|
||||
return nil
|
||||
}
|
||||
Binary file not shown.
23
readme.md
23
readme.md
@@ -144,27 +144,26 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架,有如下
|
||||
|
||||
8 directories, 9 files
|
||||
```
|
||||
|
||||
生成的代码可以直接运行:
|
||||
|
||||
```shell
|
||||
|
||||
```shell
|
||||
cd greet
|
||||
go run greet.go -f etc/greet-api.json
|
||||
```
|
||||
|
||||
默认侦听在8888端口(可以在配置文件里修改),可以通过curl请求:
|
||||
|
||||
```shell
|
||||
|
||||
默认侦听在8888端口(可以在配置文件里修改),可以通过curl请求:
|
||||
|
||||
```shell
|
||||
➜ go-zero git:(master) curl -w "\ncode: %{http_code}\n" http://localhost:8888/greet/from/kevin
|
||||
{"code":0}
|
||||
code: 200
|
||||
```
|
||||
|
||||
编写业务代码:
|
||||
|
||||
* 可以在servicecontext.go里面传递依赖给logic,比如mysql, redis等
|
||||
|
||||
编写业务代码:
|
||||
|
||||
* 可以在servicecontext.go里面传递依赖给logic,比如mysql, redis等
|
||||
* 在api定义的get/post/put/delete等请求对应的logic里增加业务处理逻辑
|
||||
|
||||
|
||||
4. 可以根据api文件生成前端需要的Java, TypeScript, Dart, JavaScript代码
|
||||
|
||||
```shell
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
|
||||
"github.com/dgrijalva/jwt-go"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/rest/internal"
|
||||
"github.com/tal-tech/go-zero/rest/token"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -43,7 +43,7 @@ func Authorize(secret string, opts ...AuthorizeOption) func(http.Handler) http.H
|
||||
opt(&authOpts)
|
||||
}
|
||||
|
||||
parser := internal.NewTokenParser()
|
||||
parser := token.NewTokenParser()
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
token, err := parser.ParseToken(r, secret, authOpts.PrevSecret)
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"github.com/tal-tech/go-zero/core/breaker"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/tal-tech/go-zero/rest/internal"
|
||||
"github.com/tal-tech/go-zero/rest/httpx"
|
||||
"github.com/tal-tech/go-zero/rest/internal/security"
|
||||
)
|
||||
|
||||
@@ -22,7 +22,7 @@ func BreakerHandler(method, path string, metrics *stat.Metrics) func(http.Handle
|
||||
if err != nil {
|
||||
metrics.AddDrop()
|
||||
logx.Errorf("[http] dropped, %s - %s - %s",
|
||||
r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent())
|
||||
r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/tal-tech/go-zero/core/utils"
|
||||
"github.com/tal-tech/go-zero/rest/httpx"
|
||||
"github.com/tal-tech/go-zero/rest/internal"
|
||||
)
|
||||
|
||||
@@ -112,10 +113,10 @@ func logBrief(r *http.Request, code int, timer *utils.ElapsedTimer, logs *intern
|
||||
var buf bytes.Buffer
|
||||
duration := timer.Duration()
|
||||
buf.WriteString(fmt.Sprintf("%d - %s - %s - %s - %s",
|
||||
code, r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration)))
|
||||
code, r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration)))
|
||||
if duration > slowThreshold {
|
||||
logx.Slowf("[HTTP] %d - %s - %s - %s - slowcall(%s)",
|
||||
code, r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration))
|
||||
code, r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration))
|
||||
}
|
||||
|
||||
ok := isOkResponse(code)
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"github.com/tal-tech/go-zero/core/load"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/tal-tech/go-zero/rest/internal"
|
||||
"github.com/tal-tech/go-zero/rest/httpx"
|
||||
"github.com/tal-tech/go-zero/rest/internal/security"
|
||||
)
|
||||
|
||||
@@ -35,7 +35,7 @@ func SheddingHandler(shedder load.Shedder, metrics *stat.Metrics) func(http.Hand
|
||||
metrics.AddDrop()
|
||||
sheddingStat.IncrementDrop()
|
||||
logx.Errorf("[http] dropped, %s - %s - %s",
|
||||
r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent())
|
||||
r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -84,7 +84,6 @@ func ParseHeader(headerValue string) map[string]string {
|
||||
// Parses the post request which contains json in body.
|
||||
func ParseJsonBody(r *http.Request, v interface{}) error {
|
||||
var reader io.Reader
|
||||
|
||||
if withJsonBody(r) {
|
||||
reader = io.LimitReader(r.Body, maxBodyLen)
|
||||
} else {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package internal
|
||||
package httpx
|
||||
|
||||
import "net/http"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package internal
|
||||
package httpx
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
@@ -16,4 +16,3 @@ func TestGetRemoteAddr(t *testing.T) {
|
||||
r.Header.Set(xForwardFor, host)
|
||||
assert.Equal(t, host, GetRemoteAddr(r))
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/rest/httpx"
|
||||
)
|
||||
|
||||
const LogContext = "request_logs"
|
||||
@@ -79,5 +80,5 @@ func formatf(r *http.Request, format string, v ...interface{}) string {
|
||||
}
|
||||
|
||||
func formatWithReq(r *http.Request, v string) string {
|
||||
return fmt.Sprintf("(%s - %s) %s", r.RequestURI, GetRemoteAddr(r), v)
|
||||
return fmt.Sprintf("(%s - %s) %s", r.RequestURI, httpx.GetRemoteAddr(r), v)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/rest/httpx"
|
||||
"github.com/tal-tech/go-zero/rest/internal/router"
|
||||
"github.com/tal-tech/go-zero/rest/router"
|
||||
)
|
||||
|
||||
func TestWithMiddleware(t *testing.T) {
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/tal-tech/go-zero/rest/handler"
|
||||
"github.com/tal-tech/go-zero/rest/httpx"
|
||||
"github.com/tal-tech/go-zero/rest/internal"
|
||||
"github.com/tal-tech/go-zero/rest/internal/router"
|
||||
"github.com/tal-tech/go-zero/rest/router"
|
||||
)
|
||||
|
||||
// use 1000m to represent 100%
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package internal
|
||||
package token
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
@@ -1,4 +1,4 @@
|
||||
package internal
|
||||
package token
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
@@ -1,6 +1,7 @@
|
||||
package gogen
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -55,6 +56,7 @@ func GoCommand(c *cli.Context) error {
|
||||
lang.Must(genLogic(dir, api))
|
||||
// it does not work
|
||||
format(dir)
|
||||
createGoModFileIfNeed(dir)
|
||||
|
||||
if err := backupAndSweep(apiFile); err != nil {
|
||||
return err
|
||||
@@ -98,7 +100,7 @@ func format(dir string) {
|
||||
cmd := exec.Command("go", "fmt", "./"+dir+"...")
|
||||
_, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
print(err.Error())
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,3 +133,43 @@ func sweep() error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func createGoModFileIfNeed(dir string) {
|
||||
absDir, err := filepath.Abs(dir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var tempPath = absDir
|
||||
var hasGoMod = false
|
||||
for {
|
||||
if tempPath == filepath.Dir(tempPath) {
|
||||
break
|
||||
}
|
||||
tempPath = filepath.Dir(tempPath)
|
||||
if util.FileExists(filepath.Join(tempPath, goModeIdentifier)) {
|
||||
tempPath = filepath.Dir(tempPath)
|
||||
hasGoMod = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasGoMod {
|
||||
gopath := os.Getenv("GOPATH")
|
||||
parent := path.Join(gopath, "src")
|
||||
pos := strings.Index(absDir, parent)
|
||||
if pos < 0 {
|
||||
moduleName := absDir[len(filepath.Dir(absDir))+1:]
|
||||
cmd := exec.Command("go", "mod", "init", moduleName)
|
||||
cmd.Dir = dir
|
||||
var stdout, stderr bytes.Buffer
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
err := cmd.Run()
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
outStr, errStr := string(stdout.Bytes()), string(stderr.Bytes())
|
||||
fmt.Printf(outStr + "\n" + errStr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,9 @@ func getParentPackage(dir string) (string, error) {
|
||||
var tempPath = absDir
|
||||
var hasGoMod = false
|
||||
for {
|
||||
if tempPath == filepath.Dir(tempPath) {
|
||||
break
|
||||
}
|
||||
tempPath = filepath.Dir(tempPath)
|
||||
if goctlutil.FileExists(filepath.Join(tempPath, goModeIdentifier)) {
|
||||
tempPath = filepath.Dir(tempPath)
|
||||
|
||||
Reference in New Issue
Block a user