Compare commits

...

60 Commits

Author SHA1 Message Date
kevin
456b395860 use predefined endpoint separator 2020-08-25 18:36:30 +08:00
kevin
f3c367a323 add fatal to stderr 2020-08-25 16:59:14 +08:00
kevin
a32028c4fb add etcd deploy yaml 2020-08-25 16:32:01 +08:00
kevin
b4572fa064 add more tests 2020-08-24 23:09:46 +08:00
kevin
ccbabf6f58 add more tests 2020-08-24 18:18:58 +08:00
kevin
5989444227 add more tests 2020-08-23 22:33:20 +08:00
kevin
dc286a03f5 add more tests 2020-08-23 15:53:10 +08:00
kevin
b82c02ed16 add more tests 2020-08-22 23:08:33 +08:00
kevin
59ba4ecc5b accelerate tests 2020-08-21 23:24:07 +08:00
kevin
5e7b514ae2 make tests parallel 2020-08-21 23:15:45 +08:00
kevin
2b1466e41e add more tests 2020-08-21 23:09:35 +08:00
kevin
9c9f80518f update readme 2020-08-21 22:51:04 +08:00
kevin
25973d6b59 update doc, add architecture picture 2020-08-21 20:09:53 +08:00
kevin
6237d01948 make test stable 2020-08-21 16:57:17 +08:00
kevin
49316b113e update readme 2020-08-21 16:52:17 +08:00
kevin
6a673e8cb0 add more tests 2020-08-21 16:42:08 +08:00
kingxt
0efa28ddbd fix generate api demo (#19)
Co-authored-by: kingxt <dream4kingxt@163.com>
2020-08-21 13:47:35 +08:00
kevin
0b6a13fe84 add more tests 2020-08-20 22:53:18 +08:00
kevin
11aa6668e8 add more tests 2020-08-20 15:35:13 +08:00
kevin
267a283328 reorg imports 2020-08-20 10:46:39 +08:00
kevin
2d8366b30e update keywords.md 2020-08-20 10:44:14 +08:00
Keson
db83843558 gocctl model v20200819 (#18)
* rename snake、came method

* new: generate model from data source

* add change log md

* update model doc

* update  doc

* beauty code
2020-08-20 10:29:18 +08:00
kevin
50565c9765 update doc 2020-08-19 22:34:54 +08:00
kevin
4c02a19a14 update stringx doc 2020-08-19 18:23:37 +08:00
kevin
a1b990c5ec update stringx doc 2020-08-19 18:22:41 +08:00
kevin
2607bb8863 update image alt in doc 2020-08-19 18:19:38 +08:00
kevin
5bf37535fe update image scale in doc 2020-08-19 18:18:44 +08:00
kevin
ed85775fd5 fix render problem in doc 2020-08-19 18:09:03 +08:00
kevin
418f8f6666 add keywords utility example 2020-08-19 17:58:57 +08:00
kevin
22e75cdf78 add release badge 2020-08-19 17:01:22 +08:00
kevin
e79c42add1 add go report badge 2020-08-19 16:10:43 +08:00
kevin
9e14820698 fix golint warnings 2020-08-19 16:00:55 +08:00
kevin
2ebb5b6b58 support customized mask char on trie 2020-08-19 14:54:59 +08:00
kevin
2673dbc6e1 add benchmark 2020-08-19 12:43:14 +08:00
Keson
d21d770b5b goctl model reactor (#15)
* reactor sql generation

* reactor sql generation

* add console & example

* optimize unit test & add document

* modify default config

* remove test file

* Revert "remove test file"

This reverts commit 81041f9e

* fix stringx.go & optimize example

* remove unused code
2020-08-19 10:41:19 +08:00
Steven Zack
1252bd9cde goctl生成Kotlin代码优化 (#16)
* 修复Kotlin连接失败抛出Exception;添加Kotlin连接超时

* 修复路径参数导致生成的Kotlin函数名带有:问题

* Added HTTP Patch Method

* kotlin-add-patch-support

* format-imports
2020-08-18 21:49:31 +08:00
kevin
054d9b5540 rename rest files 2020-08-18 20:20:44 +08:00
kevin
f03cfb0ff7 support direct scheme on rpc resolver 2020-08-18 18:36:44 +08:00
kevin
0214161bfc remove utils.Report 2020-08-17 18:05:56 +08:00
stevenzack
d4e38cb7f0 rename-Api 2020-08-17 16:54:56 +08:00
stevenzack
693a8b627a fix-log-fatal 2020-08-17 16:54:56 +08:00
stevenzack
701208b6f4 fix FileNotFoundException when response code is 4xx or 5xx 2020-08-17 16:54:56 +08:00
stevenzack
b65fcc5512 fix-lang-must-not-found 2020-08-17 16:54:56 +08:00
stevenzack
3321ed3519 multi-http-method-support 2020-08-17 16:54:56 +08:00
stevenzack
5e007c1f9f remove-logx 2020-08-17 16:54:56 +08:00
stevenzack
de2f8c06fb fix-break-line 2020-08-17 16:54:56 +08:00
stevenzack
926d746df5 Add goctl kotlin support 2020-08-17 16:54:56 +08:00
kevin
4b636cd293 refactor names 2020-08-16 23:08:29 +08:00
Klaus
4bdf5e4c90 chore: fix typo 2020-08-16 22:32:56 +08:00
kevin
721b7def7c add license badge 2020-08-15 15:16:57 +08:00
kevin
f294090130 update codecov settings 2020-08-15 15:11:55 +08:00
kevin
489980ea0f add codecov report 2020-08-15 14:53:15 +08:00
kevin
e12c8ae993 add codecov badge 2020-08-15 14:44:37 +08:00
kevin
21aad62513 use default decay value from finagle 2020-08-14 22:24:11 +08:00
kevin
0b08aca554 format readme 2020-08-14 15:33:00 +08:00
kevin
6ef1b5e14c update doc 2020-08-14 15:31:10 +08:00
kevin
8745039877 move lang.Must into logx.Must to make sure output fatal message as json 2020-08-14 15:08:06 +08:00
kevin
9d9399ad10 confirm addition after add called in periodical executor 2020-08-14 11:50:01 +08:00
kevin
e7dd04701c add more tests 2020-08-14 11:24:56 +08:00
kevin
a3d7474ae0 fix data race 2020-08-14 11:03:16 +08:00
166 changed files with 4223 additions and 3037 deletions

4
.codecov.yml Normal file
View File

@@ -0,0 +1,4 @@
ignore:
- "doc"
- "example"
- "tools"

View File

@@ -27,4 +27,9 @@ jobs:
go get -v -t -d ./...
- name: Test
run: go test -v -race ./...
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
- name: Codecov
uses: codecov/codecov-action@v1.0.6
with:
token: ${{secrets.CODECOV_TOKEN}}

View File

@@ -1,18 +0,0 @@
stages:
- analysis
variables:
GOPATH: '/runner-cache/zero'
GOCACHE: '/runner-cache/zero'
GOPROXY: 'https://goproxy.cn,direct'
analysis:
stage: analysis
image: golang
script:
- go version && go env
- go test -short $(go list ./...) | grep -v "no test"
only:
- merge_requests
tags:
- common

View File

@@ -60,17 +60,15 @@ func do(name string, execute func(b Breaker) error) error {
lock.RUnlock()
if ok {
return execute(b)
} else {
lock.Lock()
b, ok = breakers[name]
if ok {
lock.Unlock()
return execute(b)
} else {
b = NewBreaker(WithName(name))
breakers[name] = b
lock.Unlock()
return execute(b)
}
}
lock.Lock()
b, ok = breakers[name]
if !ok {
b = NewBreaker(WithName(name))
breakers[name] = b
}
lock.Unlock()
return execute(b)
}

View File

@@ -72,9 +72,9 @@ func TestCacheWithLruEvicts(t *testing.T) {
cache.Set("third", "third element")
cache.Set("fourth", "fourth element")
value, ok := cache.Get("first")
_, ok := cache.Get("first")
assert.False(t, ok)
value, ok = cache.Get("second")
value, ok := cache.Get("second")
assert.True(t, ok)
assert.Equal(t, "second element", value)
value, ok = cache.Get("third")
@@ -94,9 +94,9 @@ func TestCacheWithLruEvicted(t *testing.T) {
cache.Set("third", "third element")
cache.Set("fourth", "fourth element")
value, ok := cache.Get("first")
_, ok := cache.Get("first")
assert.False(t, ok)
value, ok = cache.Get("second")
value, ok := cache.Get("second")
assert.True(t, ok)
assert.Equal(t, "second element", value)
cache.Set("fifth", "fifth element")

View File

@@ -213,7 +213,10 @@ func TestTimingWheel_SetTimer(t *testing.T) {
}
for _, test := range tests {
test := test
t.Run(stringx.RandId(), func(t *testing.T) {
t.Parallel()
var count int32
ticker := timex.NewFakeTicker()
tick := func() {
@@ -291,7 +294,10 @@ func TestTimingWheel_SetAndMoveThenStart(t *testing.T) {
}
for _, test := range tests {
test := test
t.Run(stringx.RandId(), func(t *testing.T) {
t.Parallel()
var count int32
ticker := timex.NewFakeTicker()
tick := func() {
@@ -376,7 +382,10 @@ func TestTimingWheel_SetAndMoveTwice(t *testing.T) {
}
for _, test := range tests {
test := test
t.Run(stringx.RandId(), func(t *testing.T) {
t.Parallel()
var count int32
ticker := timex.NewFakeTicker()
tick := func() {
@@ -454,7 +463,10 @@ func TestTimingWheel_ElapsedAndSet(t *testing.T) {
}
for _, test := range tests {
test := test
t.Run(stringx.RandId(), func(t *testing.T) {
t.Parallel()
var count int32
ticker := timex.NewFakeTicker()
tick := func() {
@@ -542,7 +554,10 @@ func TestTimingWheel_ElapsedAndSetThenMove(t *testing.T) {
}
for _, test := range tests {
test := test
t.Run(stringx.RandId(), func(t *testing.T) {
t.Parallel()
var count int32
ticker := timex.NewFakeTicker()
tick := func() {

View File

@@ -34,7 +34,7 @@ func TestContextCancel(t *testing.T) {
assert.NotEqual(t, context.Canceled, c2.Err())
}
func TestConextDeadline(t *testing.T) {
func TestContextDeadline(t *testing.T) {
c, _ := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
o := ValueOnlyFrom(c)
select {

View File

@@ -2,7 +2,7 @@ package discov
import (
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
type (
@@ -26,7 +26,7 @@ func NewFacade(endpoints []string) Facade {
func (f Facade) Client() internal.EtcdClient {
conn, err := f.registry.GetConn(f.endpoints)
lang.Must(err)
logx.Must(err)
return conn
}

View File

@@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: discov

View File

@@ -0,0 +1,368 @@
apiVersion: v1
kind: Service
metadata:
name: etcd
namespace: discov
spec:
ports:
- name: etcd-port
port: 2379
protocol: TCP
targetPort: 2379
selector:
app: etcd
---
apiVersion: v1
kind: Pod
metadata:
labels:
app: etcd
etcd_node: etcd0
name: etcd0
namespace: discov
spec:
containers:
- command:
- /usr/local/bin/etcd
- --name
- etcd0
- --initial-advertise-peer-urls
- http://etcd0:2380
- --listen-peer-urls
- http://0.0.0.0:2380
- --listen-client-urls
- http://0.0.0.0:2379
- --advertise-client-urls
- http://etcd0:2379
- --initial-cluster
- etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380,etcd4=http://etcd4:2380
- --initial-cluster-state
- new
image: quay.io/coreos/etcd:latest
name: etcd0
ports:
- containerPort: 2379
name: client
protocol: TCP
- containerPort: 2380
name: server
protocol: TCP
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- etcd
topologyKey: "kubernetes.io/hostname"
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
labels:
etcd_node: etcd0
name: etcd0
namespace: discov
spec:
ports:
- name: client
port: 2379
protocol: TCP
targetPort: 2379
- name: server
port: 2380
protocol: TCP
targetPort: 2380
selector:
etcd_node: etcd0
---
apiVersion: v1
kind: Pod
metadata:
labels:
app: etcd
etcd_node: etcd1
name: etcd1
namespace: discov
spec:
containers:
- command:
- /usr/local/bin/etcd
- --name
- etcd1
- --initial-advertise-peer-urls
- http://etcd1:2380
- --listen-peer-urls
- http://0.0.0.0:2380
- --listen-client-urls
- http://0.0.0.0:2379
- --advertise-client-urls
- http://etcd1:2379
- --initial-cluster
- etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380,etcd4=http://etcd4:2380
- --initial-cluster-state
- new
image: quay.io/coreos/etcd:latest
name: etcd1
ports:
- containerPort: 2379
name: client
protocol: TCP
- containerPort: 2380
name: server
protocol: TCP
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- etcd
topologyKey: "kubernetes.io/hostname"
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
labels:
etcd_node: etcd1
name: etcd1
namespace: discov
spec:
ports:
- name: client
port: 2379
protocol: TCP
targetPort: 2379
- name: server
port: 2380
protocol: TCP
targetPort: 2380
selector:
etcd_node: etcd1
---
apiVersion: v1
kind: Pod
metadata:
labels:
app: etcd
etcd_node: etcd2
name: etcd2
namespace: discov
spec:
containers:
- command:
- /usr/local/bin/etcd
- --name
- etcd2
- --initial-advertise-peer-urls
- http://etcd2:2380
- --listen-peer-urls
- http://0.0.0.0:2380
- --listen-client-urls
- http://0.0.0.0:2379
- --advertise-client-urls
- http://etcd2:2379
- --initial-cluster
- etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380,etcd4=http://etcd4:2380
- --initial-cluster-state
- new
image: quay.io/coreos/etcd:latest
name: etcd2
ports:
- containerPort: 2379
name: client
protocol: TCP
- containerPort: 2380
name: server
protocol: TCP
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- etcd
topologyKey: "kubernetes.io/hostname"
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
labels:
etcd_node: etcd2
name: etcd2
namespace: discov
spec:
ports:
- name: client
port: 2379
protocol: TCP
targetPort: 2379
- name: server
port: 2380
protocol: TCP
targetPort: 2380
selector:
etcd_node: etcd2
---
apiVersion: v1
kind: Pod
metadata:
labels:
app: etcd
etcd_node: etcd3
name: etcd3
namespace: discov
spec:
containers:
- command:
- /usr/local/bin/etcd
- --name
- etcd3
- --initial-advertise-peer-urls
- http://etcd3:2380
- --listen-peer-urls
- http://0.0.0.0:2380
- --listen-client-urls
- http://0.0.0.0:2379
- --advertise-client-urls
- http://etcd3:2379
- --initial-cluster
- etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380,etcd4=http://etcd4:2380
- --initial-cluster-state
- new
image: quay.io/coreos/etcd:latest
name: etcd3
ports:
- containerPort: 2379
name: client
protocol: TCP
- containerPort: 2380
name: server
protocol: TCP
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- etcd
topologyKey: "kubernetes.io/hostname"
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
labels:
etcd_node: etcd3
name: etcd3
namespace: discov
spec:
ports:
- name: client
port: 2379
protocol: TCP
targetPort: 2379
- name: server
port: 2380
protocol: TCP
targetPort: 2380
selector:
etcd_node: etcd3
---
apiVersion: v1
kind: Pod
metadata:
labels:
app: etcd
etcd_node: etcd4
name: etcd4
namespace: discov
spec:
containers:
- command:
- /usr/local/bin/etcd
- --name
- etcd4
- --initial-advertise-peer-urls
- http://etcd4:2380
- --listen-peer-urls
- http://0.0.0.0:2380
- --listen-client-urls
- http://0.0.0.0:2379
- --advertise-client-urls
- http://etcd4:2379
- --initial-cluster
- etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380,etcd4=http://etcd4:2380
- --initial-cluster-state
- new
image: quay.io/coreos/etcd:latest
name: etcd4
ports:
- containerPort: 2379
name: client
protocol: TCP
- containerPort: 2380
name: server
protocol: TCP
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- etcd
topologyKey: "kubernetes.io/hostname"
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
labels:
etcd_node: etcd4
name: etcd4
namespace: discov
spec:
ports:
- name: client
port: 2379
protocol: TCP
targetPort: 2379
- name: server
port: 2380
protocol: TCP
targetPort: 2380
selector:
etcd_node: etcd4

View File

@@ -12,14 +12,14 @@ func TestBulkExecutor(t *testing.T) {
var values []int
var lock sync.Mutex
exeutor := NewBulkExecutor(func(items []interface{}) {
executor := NewBulkExecutor(func(items []interface{}) {
lock.Lock()
values = append(values, len(items))
lock.Unlock()
}, WithBulkTasks(10), WithBulkInterval(time.Minute))
for i := 0; i < 50; i++ {
exeutor.Add(1)
executor.Add(1)
time.Sleep(time.Millisecond)
}
@@ -40,13 +40,13 @@ func TestBulkExecutorFlushInterval(t *testing.T) {
var wait sync.WaitGroup
wait.Add(1)
exeutor := NewBulkExecutor(func(items []interface{}) {
executor := NewBulkExecutor(func(items []interface{}) {
assert.Equal(t, size, len(items))
wait.Done()
}, WithBulkTasks(caches), WithBulkInterval(time.Millisecond*100))
for i := 0; i < size; i++ {
exeutor.Add(1)
executor.Add(1)
}
wait.Wait()

View File

@@ -12,14 +12,14 @@ func TestChunkExecutor(t *testing.T) {
var values []int
var lock sync.Mutex
exeutor := NewChunkExecutor(func(items []interface{}) {
executor := NewChunkExecutor(func(items []interface{}) {
lock.Lock()
values = append(values, len(items))
lock.Unlock()
}, WithChunkBytes(10), WithFlushInterval(time.Minute))
for i := 0; i < 50; i++ {
exeutor.Add(1, 1)
executor.Add(1, 1)
time.Sleep(time.Millisecond)
}
@@ -40,13 +40,13 @@ func TestChunkExecutorFlushInterval(t *testing.T) {
var wait sync.WaitGroup
wait.Add(1)
exeutor := NewChunkExecutor(func(items []interface{}) {
executor := NewChunkExecutor(func(items []interface{}) {
assert.Equal(t, size, len(items))
wait.Done()
}, WithChunkBytes(caches), WithFlushInterval(time.Millisecond*100))
for i := 0; i < size; i++ {
exeutor.Add(1, 1)
executor.Add(1, 1)
}
wait.Wait()

View File

@@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
@@ -32,19 +33,21 @@ type (
container TaskContainer
waitGroup sync.WaitGroup
// avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
wgBarrier syncx.Barrier
guarded bool
newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
wgBarrier syncx.Barrier
confirmChan chan lang.PlaceholderType
guarded bool
newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
}
)
func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
// buffer 1 to let the caller go quickly
commander: make(chan interface{}, 1),
interval: interval,
container: container,
commander: make(chan interface{}, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
},
@@ -59,10 +62,12 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per
func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
}
}
func (pe *PeriodicalExecutor) Flush() bool {
pe.enterExecution()
return pe.executeTasks(func() interface{} {
pe.lock.Lock()
defer pe.lock.Unlock()
@@ -114,6 +119,8 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
select {
case vals := <-pe.commander:
commanded = true
pe.enterExecution()
pe.confirmChan <- lang.Placeholder
pe.executeTasks(vals)
last = timex.Now()
case <-ticker.Chan():
@@ -135,13 +142,18 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
})
}
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
func (pe *PeriodicalExecutor) doneExecution() {
pe.waitGroup.Done()
}
func (pe *PeriodicalExecutor) enterExecution() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1)
})
defer pe.wgBarrier.Guard(func() {
pe.waitGroup.Done()
})
}
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
defer pe.doneExecution()
ok := pe.hasTasks(tasks)
if ok {

View File

@@ -106,6 +106,40 @@ func TestPeriodicalExecutor_Bulk(t *testing.T) {
lock.Unlock()
}
func TestPeriodicalExecutor_Wait(t *testing.T) {
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []interface{}) {
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(time.Second))
for i := 0; i < 10; i++ {
executer.Add(1)
}
executer.Flush()
executer.Wait()
}
func TestPeriodicalExecutor_WaitFast(t *testing.T) {
const total = 3
var cnt int
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []interface{}) {
defer func() {
cnt++
}()
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
for i := 0; i < total; i++ {
executer.Add(2)
}
executer.Flush()
executer.Wait()
assert.Equal(t, total, cnt)
}
// go test -benchtime 10s -bench .
func BenchmarkExecutor(b *testing.B) {
b.ReportAllocs()

View File

@@ -4,9 +4,8 @@ import (
"os"
"testing"
"github.com/tal-tech/go-zero/core/fs"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
)
const (
@@ -15,34 +14,34 @@ const (
text = `first line
Cum sociis natoque penatibus et magnis dis parturient. Phasellus laoreet lorem vel dolor tempus vehicula. Vivamus sagittis lacus vel augue laoreet rutrum faucibus. Integer legentibus erat a ante historiarum dapibus.
Quisque ut dolor gravida, placerat libero vel, euismod. Quam temere in vitiis, legem sancimus haerentia. Qui ipsorum lingua Celtae, nostra Galli appellantur. Quis aute iure reprehenderit in voluptate velit esse. Fabio vel iudice vincam, sunt in culpa qui officia. Cras mattis iudicium purus sit amet fermentum.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculus sed magna.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculous sed magna.
Magna pars studiorum, prodita quaerimus. Cum ceteris in veneratione tui montes, nascetur mus. Morbi odio eros, volutpat ut pharetra vitae, lobortis sed nibh. Plura mihi bona sunt, inclinet, amari petere vellent. Idque Caesaris facere voluntate liceret: sese habere. Tu quoque, Brute, fili mi, nihil timor populi, nihil!
Tityre, tu patulae recubans sub tegmine fagi dolor. Inmensae subtilitatis, obscuris et malesuada fames. Quae vero auctorem tractata ab fiducia dicuntur.
Cum sociis natoque penatibus et magnis dis parturient. Phasellus laoreet lorem vel dolor tempus vehicula. Vivamus sagittis lacus vel augue laoreet rutrum faucibus. Integer legentibus erat a ante historiarum dapibus.
Quisque ut dolor gravida, placerat libero vel, euismod. Quam temere in vitiis, legem sancimus haerentia. Qui ipsorum lingua Celtae, nostra Galli appellantur. Quis aute iure reprehenderit in voluptate velit esse. Fabio vel iudice vincam, sunt in culpa qui officia. Cras mattis iudicium purus sit amet fermentum.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculus sed magna.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculous sed magna.
Magna pars studiorum, prodita quaerimus. Cum ceteris in veneratione tui montes, nascetur mus. Morbi odio eros, volutpat ut pharetra vitae, lobortis sed nibh. Plura mihi bona sunt, inclinet, amari petere vellent. Idque Caesaris facere voluntate liceret: sese habere. Tu quoque, Brute, fili mi, nihil timor populi, nihil!
Tityre, tu patulae recubans sub tegmine fagi dolor. Inmensae subtilitatis, obscuris et malesuada fames. Quae vero auctorem tractata ab fiducia dicuntur.
Cum sociis natoque penatibus et magnis dis parturient. Phasellus laoreet lorem vel dolor tempus vehicula. Vivamus sagittis lacus vel augue laoreet rutrum faucibus. Integer legentibus erat a ante historiarum dapibus.
Quisque ut dolor gravida, placerat libero vel, euismod. Quam temere in vitiis, legem sancimus haerentia. Qui ipsorum lingua Celtae, nostra Galli appellantur. Quis aute iure reprehenderit in voluptate velit esse. Fabio vel iudice vincam, sunt in culpa qui officia. Cras mattis iudicium purus sit amet fermentum.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculus sed magna.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculous sed magna.
Magna pars studiorum, prodita quaerimus. Cum ceteris in veneratione tui montes, nascetur mus. Morbi odio eros, volutpat ut pharetra vitae, lobortis sed nibh. Plura mihi bona sunt, inclinet, amari petere vellent. Idque Caesaris facere voluntate liceret: sese habere. Tu quoque, Brute, fili mi, nihil timor populi, nihil!
Tityre, tu patulae recubans sub tegmine fagi dolor. Inmensae subtilitatis, obscuris et malesuada fames. Quae vero auctorem tractata ab fiducia dicuntur.
` + longLine
textWithLastNewline = `first line
Cum sociis natoque penatibus et magnis dis parturient. Phasellus laoreet lorem vel dolor tempus vehicula. Vivamus sagittis lacus vel augue laoreet rutrum faucibus. Integer legentibus erat a ante historiarum dapibus.
Quisque ut dolor gravida, placerat libero vel, euismod. Quam temere in vitiis, legem sancimus haerentia. Qui ipsorum lingua Celtae, nostra Galli appellantur. Quis aute iure reprehenderit in voluptate velit esse. Fabio vel iudice vincam, sunt in culpa qui officia. Cras mattis iudicium purus sit amet fermentum.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculus sed magna.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculous sed magna.
Magna pars studiorum, prodita quaerimus. Cum ceteris in veneratione tui montes, nascetur mus. Morbi odio eros, volutpat ut pharetra vitae, lobortis sed nibh. Plura mihi bona sunt, inclinet, amari petere vellent. Idque Caesaris facere voluntate liceret: sese habere. Tu quoque, Brute, fili mi, nihil timor populi, nihil!
Tityre, tu patulae recubans sub tegmine fagi dolor. Inmensae subtilitatis, obscuris et malesuada fames. Quae vero auctorem tractata ab fiducia dicuntur.
Cum sociis natoque penatibus et magnis dis parturient. Phasellus laoreet lorem vel dolor tempus vehicula. Vivamus sagittis lacus vel augue laoreet rutrum faucibus. Integer legentibus erat a ante historiarum dapibus.
Quisque ut dolor gravida, placerat libero vel, euismod. Quam temere in vitiis, legem sancimus haerentia. Qui ipsorum lingua Celtae, nostra Galli appellantur. Quis aute iure reprehenderit in voluptate velit esse. Fabio vel iudice vincam, sunt in culpa qui officia. Cras mattis iudicium purus sit amet fermentum.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculus sed magna.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculous sed magna.
Magna pars studiorum, prodita quaerimus. Cum ceteris in veneratione tui montes, nascetur mus. Morbi odio eros, volutpat ut pharetra vitae, lobortis sed nibh. Plura mihi bona sunt, inclinet, amari petere vellent. Idque Caesaris facere voluntate liceret: sese habere. Tu quoque, Brute, fili mi, nihil timor populi, nihil!
Tityre, tu patulae recubans sub tegmine fagi dolor. Inmensae subtilitatis, obscuris et malesuada fames. Quae vero auctorem tractata ab fiducia dicuntur.
Cum sociis natoque penatibus et magnis dis parturient. Phasellus laoreet lorem vel dolor tempus vehicula. Vivamus sagittis lacus vel augue laoreet rutrum faucibus. Integer legentibus erat a ante historiarum dapibus.
Quisque ut dolor gravida, placerat libero vel, euismod. Quam temere in vitiis, legem sancimus haerentia. Qui ipsorum lingua Celtae, nostra Galli appellantur. Quis aute iure reprehenderit in voluptate velit esse. Fabio vel iudice vincam, sunt in culpa qui officia. Cras mattis iudicium purus sit amet fermentum.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculus sed magna.
Quo usque tandem abutere, Catilina, patientia nostra? Gallia est omnis divisa in partes tres, quarum. Quam diu etiam furor iste tuus nos eludet? Quid securi etiam tamquam eu fugiat nulla pariatur. Curabitur blandit tempus ardua ridiculous sed magna.
Magna pars studiorum, prodita quaerimus. Cum ceteris in veneratione tui montes, nascetur mus. Morbi odio eros, volutpat ut pharetra vitae, lobortis sed nibh. Plura mihi bona sunt, inclinet, amari petere vellent. Idque Caesaris facere voluntate liceret: sese habere. Tu quoque, Brute, fili mi, nihil timor populi, nihil!
Tityre, tu patulae recubans sub tegmine fagi dolor. Inmensae subtilitatis, obscuris et malesuada fames. Quae vero auctorem tractata ab fiducia dicuntur.
` + longLine + "\n"

View File

@@ -49,7 +49,7 @@ func From(generate GenerateFunc) Stream {
return Range(source)
}
// Just converts the given arbitary items to a Stream.
// Just converts the given arbitrary items to a Stream.
func Just(items ...interface{}) Stream {
source := make(chan interface{}, len(items))
for _, item := range items {
@@ -195,7 +195,7 @@ func (p Stream) Merge() Stream {
return Range(source)
}
// Parallel applies the given ParallenFunc to each item concurrently with given number of workers.
// Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
p.Walk(func(item interface{}, pipe chan<- interface{}) {
fn(item)

View File

@@ -1,16 +1,8 @@
package lang
import "log"
var Placeholder PlaceholderType
type (
GenericType = interface{}
PlaceholderType = struct{}
)
func Must(err error) {
if err != nil {
log.Fatal(err)
}
}

View File

@@ -1,7 +0,0 @@
package lang
import "testing"
func TestMust(t *testing.T) {
Must(nil)
}

View File

@@ -17,7 +17,6 @@ import (
"sync/atomic"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/sysx"
"github.com/tal-tech/go-zero/core/timex"
)
@@ -46,6 +45,7 @@ const (
levelInfo = "info"
levelError = "error"
levelSevere = "severe"
levelFatal = "fatal"
levelSlow = "slow"
levelStat = "stat"
@@ -100,7 +100,7 @@ type (
)
func MustSetup(c LogConf) {
lang.Must(SetUp(c))
Must(SetUp(c))
}
// SetUp sets up the logx. If already set up, just return nil.
@@ -210,6 +210,15 @@ func Infof(format string, v ...interface{}) {
infoSync(fmt.Sprintf(format, v...))
}
func Must(err error) {
if err != nil {
msg := formatWithCaller(err.Error(), 3)
log.Print(msg)
output(severeLog, levelFatal, msg)
os.Exit(1)
}
}
func SetLevel(level uint32) {
atomic.StoreUint32(&logLevel, level)
}

View File

@@ -131,6 +131,10 @@ func TestSetLevelWithDuration(t *testing.T) {
assert.Equal(t, 0, writer.builder.Len())
}
func TestMustNil(t *testing.T) {
Must(nil)
}
func BenchmarkCopyByteSliceAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
var buf []byte

View File

@@ -9,24 +9,24 @@ import (
var ErrNoAvailablePusher = errors.New("no available pusher")
type BalancedQueuePusher struct {
type BalancedPusher struct {
name string
pushers []Pusher
index uint64
}
func NewBalancedQueuePusher(pushers []Pusher) Pusher {
return &BalancedQueuePusher{
func NewBalancedPusher(pushers []Pusher) Pusher {
return &BalancedPusher{
name: generateName(pushers),
pushers: pushers,
}
}
func (pusher *BalancedQueuePusher) Name() string {
func (pusher *BalancedPusher) Name() string {
return pusher.name
}
func (pusher *BalancedQueuePusher) Push(message string) error {
func (pusher *BalancedPusher) Push(message string) error {
size := len(pusher.pushers)
for i := 0; i < size; i++ {

View File

@@ -20,7 +20,7 @@ func TestBalancedQueuePusher(t *testing.T) {
mockedPushers = append(mockedPushers, p)
}
pusher := NewBalancedQueuePusher(pushers)
pusher := NewBalancedPusher(pushers)
assert.True(t, len(pusher.Name()) > 0)
for i := 0; i < numPushers*1000; i++ {
@@ -37,7 +37,7 @@ func TestBalancedQueuePusher(t *testing.T) {
}
func TestBalancedQueuePusher_NoAvailable(t *testing.T) {
pusher := NewBalancedQueuePusher(nil)
pusher := NewBalancedPusher(nil)
assert.True(t, len(pusher.Name()) == 0)
assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item"))
}

View File

@@ -2,23 +2,23 @@ package queue
import "github.com/tal-tech/go-zero/core/errorx"
type MultiQueuePusher struct {
type MultiPusher struct {
name string
pushers []Pusher
}
func NewMultiQueuePusher(pushers []Pusher) Pusher {
return &MultiQueuePusher{
func NewMultiPusher(pushers []Pusher) Pusher {
return &MultiPusher{
name: generateName(pushers),
pushers: pushers,
}
}
func (pusher *MultiQueuePusher) Name() string {
func (pusher *MultiPusher) Name() string {
return pusher.name
}
func (pusher *MultiQueuePusher) Push(message string) error {
func (pusher *MultiPusher) Push(message string) error {
var batchError errorx.BatchError
for _, each := range pusher.pushers {

View File

@@ -21,7 +21,7 @@ func TestMultiQueuePusher(t *testing.T) {
mockedPushers = append(mockedPushers, p)
}
pusher := NewMultiQueuePusher(pushers)
pusher := NewMultiPusher(pushers)
assert.True(t, len(pusher.Name()) > 0)
for i := 0; i < 1000; i++ {

View File

@@ -14,7 +14,6 @@ import (
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/sysx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/utils"
)
const (
@@ -24,7 +23,7 @@ const (
)
var (
reporter = utils.Report
reporter func(string)
lock sync.RWMutex
lessExecutor = executors.NewLessExecutor(time.Minute * 5)
dropped int32

View File

@@ -7,7 +7,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
const (
@@ -24,17 +24,17 @@ var (
func init() {
cpus, err := perCpuUsage()
lang.Must(err)
logx.Must(err)
cores = uint64(len(cpus))
sets, err := cpuSets()
lang.Must(err)
logx.Must(err)
quota = float64(len(sets))
cq, err := cpuQuota()
if err == nil {
if cq != -1 {
period, err := cpuPeriod()
lang.Must(err)
logx.Must(err)
limit := float64(cq) / float64(period)
if limit < quota {
@@ -44,10 +44,10 @@ func init() {
}
preSystem, err = systemCpuUsage()
lang.Must(err)
logx.Must(err)
preTotal, err = totalCpuUsage()
lang.Must(err)
logx.Must(err)
}
func RefreshCpu() uint64 {

View File

@@ -5,7 +5,6 @@ import (
"time"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/stat"
@@ -33,7 +32,7 @@ type delayTask struct {
func init() {
var err error
timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
lang.Must(err)
logx.Must(err)
proc.AddShutdownListener(func() {
timingWheel.Drain(clean)

View File

@@ -212,10 +212,12 @@ func TestRedis_Persist(t *testing.T) {
assert.Nil(t, err)
assert.False(t, ok)
err = client.Expire("key", 5)
assert.Nil(t, err)
ok, err = client.Persist("key")
assert.Nil(t, err)
assert.True(t, ok)
err = client.Expireat("key", time.Now().Unix()+5)
assert.Nil(t, err)
ok, err = client.Persist("key")
assert.Nil(t, err)
assert.True(t, ok)
@@ -379,7 +381,7 @@ func TestRedis_SortedSet(t *testing.T) {
rank, err := client.Zrank("key", "value2")
assert.Nil(t, err)
assert.Equal(t, int64(1), rank)
rank, err = client.Zrank("key", "value4")
_, err = client.Zrank("key", "value4")
assert.Equal(t, redis.Nil, err)
num, err := client.Zrem("key", "value2", "value3")
assert.Nil(t, err)

View File

@@ -249,10 +249,12 @@ func TestRedis_Persist(t *testing.T) {
assert.Nil(t, err)
assert.False(t, ok)
err = client.Expire("key", 5)
assert.Nil(t, err)
ok, err = client.Persist("key")
assert.Nil(t, err)
assert.True(t, ok)
err = client.Expireat("key", time.Now().Unix()+5)
assert.Nil(t, err)
ok, err = client.Persist("key")
assert.Nil(t, err)
assert.True(t, ok)
@@ -447,7 +449,7 @@ func TestRedis_SortedSet(t *testing.T) {
rank, err := client.Zrank("key", "value2")
assert.Nil(t, err)
assert.Equal(t, int64(1), rank)
rank, err = client.Zrank("key", "value4")
_, err = client.Zrank("key", "value4")
assert.Equal(t, Nil, err)
num, err := client.Zrem("key", "value2", "value3")
assert.Nil(t, err)
@@ -558,6 +560,7 @@ func TestRedis_Pipelined(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "1", value)
score, err := client.Zscore("zadd", "zadd")
assert.Nil(t, err)
assert.Equal(t, int64(12), score)
})
}

View File

@@ -63,11 +63,6 @@ func (r *replacer) Replace(text string) string {
i = j - 1
builder.WriteString(r.mapping[string(chars[start:end])])
} else {
if j < size {
end = j + 1
} else {
end = size
}
builder.WriteRune(chars[i])
}
start = -1

View File

@@ -2,7 +2,11 @@ package stringx
import "github.com/tal-tech/go-zero/core/lang"
const defaultMask = '*'
type (
TrieOption func(trie *trieNode)
Trie interface {
Filter(text string) (string, []string, bool)
FindKeywords(text string) []string
@@ -10,6 +14,7 @@ type (
trieNode struct {
node
mask rune
}
scope struct {
@@ -18,8 +23,15 @@ type (
}
)
func NewTrie(words []string) Trie {
func NewTrie(words []string, opts ...TrieOption) Trie {
n := new(trieNode)
for _, opt := range opts {
opt(n)
}
if n.mask == 0 {
n.mask = defaultMask
}
for _, word := range words {
n.add(word)
}
@@ -114,6 +126,12 @@ func (n *trieNode) findKeywordScopes(chars []rune) []scope {
func (n *trieNode) replaceWithAsterisk(chars []rune, start, stop int) {
for i := start; i < stop; i++ {
chars[i] = '*'
chars[i] = n.mask
}
}
func WithMask(mask rune) TrieOption {
return func(n *trieNode) {
n.mask = mask
}
}

View File

@@ -109,25 +109,25 @@ func TestTrie(t *testing.T) {
func TestTrieSingleWord(t *testing.T) {
trie := NewTrie([]string{
"闹",
})
}, WithMask('#'))
output, keywords, ok := trie.Filter("今晚真热闹")
assert.ElementsMatch(t, []string{"闹"}, keywords)
assert.True(t, ok)
assert.Equal(t, "今晚真热*", output)
assert.Equal(t, "今晚真热#", output)
}
func TestTrieOverlap(t *testing.T) {
trie := NewTrie([]string{
"一二三四五",
"二三四五六七八",
})
}, WithMask('#'))
output, keywords, ok := trie.Filter("零一二三四五六七八九十")
assert.ElementsMatch(t, []string{
"一二三四五",
"二三四五六七八",
}, keywords)
assert.True(t, ok)
assert.Equal(t, "零********九十", output)
assert.Equal(t, "零########九十", output)
}
func TestTrieNested(t *testing.T) {
@@ -135,7 +135,7 @@ func TestTrieNested(t *testing.T) {
"一二三",
"一二三四五",
"一二三四五六七八",
})
}, WithMask('#'))
output, keywords, ok := trie.Filter("零一二三四五六七八九十")
assert.ElementsMatch(t, []string{
"一二三",
@@ -143,7 +143,7 @@ func TestTrieNested(t *testing.T) {
"一二三四五六七八",
}, keywords)
assert.True(t, ok)
assert.Equal(t, "零********九十", output)
assert.Equal(t, "零########九十", output)
}
func BenchmarkTrie(b *testing.B) {

View File

@@ -3,7 +3,7 @@ package sysx
import (
"os"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx"
)
var hostname string
@@ -11,7 +11,9 @@ var hostname string
func init() {
var err error
hostname, err = os.Hostname()
lang.Must(err)
if err != nil {
hostname = stringx.RandId()
}
}
func Hostname() string {

View File

@@ -1,5 +0,0 @@
package utils
func Report(content string) {
// TODO: implement the report method
}

View File

@@ -3,8 +3,8 @@
## goctl用途
* 定义api请求
* 根据定义的api自动生成golang(后端), java(iOS & Android), typescript(web & 晓程序)dart(flutter)
* 生成MySQL CURD (https://goctl.xiaoheiban.cn)
* 生成MongoDB CURD (https://goctl.xiaoheiban.cn)
* 生成MySQL CURD+Cache
* 生成MongoDB CURD+Cache
## goctl使用说明
#### goctl参数说明
@@ -179,23 +179,38 @@ service user-api {
* 在定义的get/post/put/delete等请求的handler和logic里增加处理业务逻辑的代码
#### 根据定义好的api文件生成java代码
`goctl api java -api user/user.api -dir ./src`
```shell
goctl api java -api user/user.api -dir ./src
```
#### 根据定义好的api文件生成typescript代码
`goctl api ts -api user/user.api -dir ./src -webapi ***`
ts需要指定webapi所在目录
```shell
goctl api ts -api user/user.api -dir ./src -webapi ***
ts需要指定webapi所在目录
```
#### 根据定义好的api文件生成Dart代码
`goctl api dart -api user/user.api -dir ./src`
```shell
goctl api dart -api user/user.api -dir ./src
```
## 根据mysql ddl或者datasource生成model文件
```shell script
$ goctl model mysql -src={filename} -dir={dir} -cache={true|false}
```
详情参考[model文档](https://github.com/tal-tech/go-zero/blob/master/tools/goctl/model/sql/README.MD)
## 根据定义好的简单go文件生成mongo代码文件(仅限golang使用)
`goctl model mongo -src {{yourDir}}/xiao/service/xhb/user/model/usermodel.go -cache yes`
-src需要提供简单的usermodel.go文件里面只需要提供一个结构体即可
-cache 控制是否需要缓存 yes=需要 no=不需要
src 示例代码如下
```
```shell
goctl model mongo -src {{yourDir}}/xiao/service/xhb/user/model/usermodel.go -cache yes
-src需要提供简单的usermodel.go文件里面只需要提供一个结构体即可
-cache 控制是否需要缓存 yes=需要 no=不需要
src 示例代码如下
```
```go
package model
type User struct {
@@ -210,7 +225,7 @@ type User struct {
o是改字段需要生产的操作函数 可以取得get,find,set 分别表示生成返回单个对象的查询方法,返回多个对象的查询方法,设置该字段方法
生成的目标文件会覆盖该简单go文件
## goctl rpc生成
## goctl rpc生成(业务剥离中,暂未开放)
命令 `goctl rpc proto -proto ${proto} -service ${serviceName} -project ${projectName} -dir ${directory} -shared ${shared}`
如: `goctl rpc proto -proto test.proto -service test -project xjy -dir .`
@@ -261,5 +276,4 @@ type User struct {
│   └── test.go [强制覆盖更新]
└── test.proto
```
- 注意 目前rpc目录生成的proto文件暂不支持import外部proto文件
* 如有不理解的地方随时问Kim/Kevin
- 注意 目前rpc目录生成的proto文件暂不支持import外部proto文件

BIN
doc/images/architecture.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 333 KiB

BIN
doc/images/benchmark.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

BIN
doc/images/trie.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 91 KiB

86
doc/keywords.md Normal file
View File

@@ -0,0 +1,86 @@
# 高效的关键词替换和敏感词过滤工具
## 1. 算法介绍
利用高效的Trie树建立关键词树如下图所示然后依次查找字符串中的相连字符是否形成树的一条路径
<img src="images/trie.png" alt="trie" width="350" />
发现掘金上[这篇文章](https://juejin.im/post/6844903750490914829)写的比较详细,可以一读,具体原理在此不详述。
## 2. 关键词替换
支持关键词重叠,自动选用最长的关键词,代码示例如下:
```go
replacer := stringx.NewReplacer(map[string]string{
"日本": "法国",
"日本的首都": "东京",
"东京": "日本的首都",
})
fmt.Println(replacer.Replace("日本的首都是东京"))
```
可以得到:
```
东京是日本的首都
```
示例代码见`example/stringx/replace/replace.go`
## 3. 查找敏感词
代码示例如下:
```go
filter := stringx.NewTrie([]string{
"AV演员",
"苍井空",
"AV",
"日本AV女优",
"AV演员色情",
})
keywords := filter.FindKeywords("日本AV演员兼电视、电影演员。苍井空AV女优是xx出道, 日本AV女优们最精彩的表演是AV演员色情表演")
fmt.Println(keywords)
```
可以得到:
```
[苍井空 日本AV女优 AV演员色情 AV AV演员]
```
## 4. 敏感词过滤
代码示例如下:
```go
filter := stringx.NewTrie([]string{
"AV演员",
"苍井空",
"AV",
"日本AV女优",
"AV演员色情",
}, stringx.WithMask('?')) // 默认替换为*
safe, keywords, found := filter.Filter("日本AV演员兼电视、电影演员。苍井空AV女优是xx出道, 日本AV女优们最精彩的表演是AV演员色情表演")
fmt.Println(safe)
fmt.Println(keywords)
fmt.Println(found)
```
可以得到:
```
日本????兼电视、电影演员。?????女优是xx出道, ??????们最精彩的表演是??????表演
[苍井空 日本AV女优 AV演员色情 AV AV演员]
true
```
示例代码见`example/stringx/filter/filter.go`
## 5. Benchmark
| Sentences | Keywords | Regex | go-zero |
| --------- | -------- | -------- | ------- |
| 10000 | 10000 | 16min10s | 27.2ms |

View File

@@ -10,6 +10,7 @@ import (
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -99,7 +100,7 @@ func main() {
gb := breaker.NewBreaker()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,state,googleCalls,netflixCalls")

View File

@@ -5,12 +5,12 @@ import (
"time"
"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
func main() {
sub, err := discov.NewSubscriber([]string{"etcd.discovery:2379"}, "028F2C35852D", discov.Exclusive())
lang.Must(err)
logx.Must(err)
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

View File

@@ -9,12 +9,13 @@ import (
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
var (
freq = flag.Int("freq", 100, "frequence")
freq = flag.Int("freq", 100, "frequency")
duration = flag.String("duration", "10s", "duration")
)
@@ -83,8 +84,8 @@ func (m *metric) reset() counting {
return result
}
func runRequests(url string, frequence int, metrics *metric, done <-chan lang.PlaceholderType) {
ticker := time.NewTicker(time.Second / time.Duration(frequence))
func runRequests(url string, frequency int, metrics *metric, done <-chan lang.PlaceholderType) {
ticker := time.NewTicker(time.Second / time.Duration(frequency))
defer ticker.Stop()
for {
@@ -119,14 +120,14 @@ func main() {
flag.Parse()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,goodOk,goodFail,goodReject,goodErrs,goodUnknowns,goodDropRatio,"+
"heavyOk,heavyFail,heavyReject,heavyErrs,heavyUnknowns,heavyDropRatio")
var gm, hm metric
dur, err := time.ParseDuration(*duration)
lang.Must(err)
logx.Must(err)
done := make(chan lang.PlaceholderType)
group := threading.NewRoutineGroup()
group.RunSafe(func() {

View File

@@ -13,7 +13,7 @@ import (
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/executors"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -47,7 +47,7 @@ func main() {
lessWriter = executors.NewLessExecutor(interval * total / 100)
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "second,maxFlight,flying,agressiveAvgFlying,lazyAvgFlying,bothAvgFlying")

View File

@@ -11,7 +11,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/fx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
var (
@@ -27,7 +27,7 @@ func main() {
flag.Parse()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,total,pass,fail,drop")

View File

@@ -8,11 +8,11 @@ import (
)
func main() {
exeutor := executors.NewBulkExecutor(func(items []interface{}) {
executor := executors.NewBulkExecutor(func(items []interface{}) {
fmt.Println(len(items))
}, executors.WithBulkTasks(10))
for {
exeutor.Add(1)
executor.Add(1)
time.Sleep(time.Millisecond * 90)
}
}

View File

@@ -2,7 +2,9 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/tal-tech/go-zero/core/discov"
@@ -10,13 +12,31 @@ import (
"github.com/tal-tech/go-zero/rpcx"
)
var lb = flag.String("t", "direct", "the load balancer type")
func main() {
cli := rpcx.MustNewClient(rpcx.RpcClientConf{
Etcd: discov.EtcdConf{
Hosts: []string{"localhost:2379"},
Key: "rpcx",
},
})
flag.Parse()
var cli rpcx.Client
switch *lb {
case "direct":
cli = rpcx.MustNewClient(rpcx.RpcClientConf{
Endpoints: []string{
"localhost:3456",
"localhost:3457",
},
})
case "discov":
cli = rpcx.MustNewClient(rpcx.RpcClientConf{
Etcd: discov.EtcdConf{
Hosts: []string{"localhost:2379"},
Key: "rpcx",
},
})
default:
log.Fatal("bad load balancing type")
}
greet := unary.NewGreeterClient(cli.Conn())
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

View File

@@ -0,0 +1,21 @@
package main
import (
"fmt"
"github.com/tal-tech/go-zero/core/stringx"
)
func main() {
filter := stringx.NewTrie([]string{
"AV演员",
"苍井空",
"AV",
"日本AV女优",
"AV演员色情",
}, stringx.WithMask('?'))
safe, keywords, found := filter.Filter("日本AV演员兼电视、电影演员。苍井空AV女优是xx出道, 日本AV女优们最精彩的表演是AV演员色情表演")
fmt.Println(safe)
fmt.Println(keywords)
fmt.Println(found)
}

View File

@@ -0,0 +1,16 @@
package main
import (
"fmt"
"github.com/tal-tech/go-zero/core/stringx"
)
func main() {
replacer := stringx.NewReplacer(map[string]string{
"日本": "法国",
"日本的首都": "东京",
"东京": "日本的首都",
})
fmt.Println(replacer.Replace("日本的首都是东京"))
}

3
go.mod
View File

@@ -13,6 +13,7 @@ require (
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-sql-driver/mysql v1.5.0
github.com/go-xorm/builder v0.3.4
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/mock v1.4.3
@@ -22,6 +23,7 @@ require (
github.com/google/uuid v1.1.1
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/justinas/alice v1.2.0
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
@@ -41,6 +43,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/urfave/cli v1.22.4
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect
go.etcd.io/etcd v0.0.0-20200402134248-51bdeb39e698
go.uber.org/automaxprocs v1.3.0

8
go.sum
View File

@@ -76,6 +76,10 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-xorm/builder v0.3.4 h1:FxkeGB4Cggdw3tPwutLCpfjng2jugfkg6LDMrd/KsoY=
github.com/go-xorm/builder v0.3.4/go.mod h1:KxkQkNN1DpPKTedxXyTQcmH+rXfvk4LZ9SOOBoZBAxw=
github.com/go-xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:9wScpmSP5A3Bk8V3XHWUcJmYTh+ZnlHVyc+A4oZYS3Y=
github.com/go-xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:56xuuqnHyryaerycW3BfssRdxQstACi0Epw/yC5E2xM=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -134,6 +138,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtg
github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 h1:VHgatEHNcBFEB7inlalqfNqw65aNkM1lGX2yt3NmbS8=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
@@ -266,6 +272,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 h1:YdYsPAZ2pC6Tow/nPZOPQ96O3hm/ToAkGsPLzedXERk=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 h1:zzrxE1FKn5ryBNl9eKOeqQ58Y/Qpo3Q9QNxKHX5uzzQ=
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2/go.mod h1:hzfGeIUDq/j97IG+FhNqkowIyEcD88LrW6fyU3K3WqY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=

View File

@@ -1,6 +1,27 @@
# go-zero项目介绍
# go-zero
![Go](https://github.com/tal-tech/go-zero/workflows/Go/badge.svg?branch=master)
[![Go](https://github.com/tal-tech/go-zero/workflows/Go/badge.svg?branch=master)](https://github.com/tal-tech/go-zero/actions)
[![codecov](https://codecov.io/gh/tal-tech/go-zero/branch/master/graph/badge.svg)](https://codecov.io/gh/tal-tech/go-zero)
[![Go Report Card](https://goreportcard.com/badge/github.com/tal-tech/go-zero)](https://goreportcard.com/report/github.com/tal-tech/go-zero)
[![Release](https://img.shields.io/github/v/release/tal-tech/go-zero.svg?style=flat-square)](https://github.com/tal-tech/go-zero)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
## 0. go-zero介绍
go-zero是一个集成了各种工程实践的web和rpc框架。通过弹性设计保障了大并发服务端的稳定性经受了充分的实战检验。
go-zero 包含极简的 API 定义和生成工具 goctl可以根据定义的 api 文件一键生成 Go, iOS, Android, Kotlin, Dart, TypeScript, JavaScript 代码,并可直接运行。
使用go-zero的好处
* 轻松获得支撑千万日活服务的稳定性
* 内建级联超时控制、限流、自适应熔断、自适应降载等微服务治理能力,无需配置和额外代码
* 微服务治理中间件可无缝集成到其它现有框架使用
* 极简的API描述一键生成各端代码
* 自动校验客户端请求参数合法性
* 大量微服务治理和并发工具包
<img src="doc/images/architecture.png" alt="架构图" width="1500" />
## 1. go-zero框架背景
@@ -53,33 +74,20 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架有如下
![弹性设计](doc/images/resilience.jpg)
## 4. go-zero框架收益
* 保障大并发服务端的稳定性,经受了充分的实战检验
* 极简的API定义
* 一键生成Go, iOS, Android, Dart, TypeScript, JavaScript代码并可直接运行
* 服务端自动校验参数合法性
## 5. go-zero近期开发计划
## 4. go-zero近期开发计划
* 自动生成API mock server便于客户端开发
* 自动生成服务端功能测试
## 6. Installation
## 5. Installation
1. 在项目目录下通过如下命令安装:
在项目目录下通过如下命令安装:
```shell
go get -u github.com/tal-tech/go-zero
```
```shell
go get -u github.com/tal-tech/go-zero
```
2. 代码里导入go-zero
```go
import "github.com/tal-tech/go-zero"
```
## 7. Quick Start
## 6. Quick Start
1. 编译goctl工具
@@ -93,7 +101,7 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架有如下
```go
type Request struct {
Name string `path:"name"`
Name string `path:"name,options=you|me"` // 框架自动验证请求参数是否合法
}
type Response struct {
@@ -123,7 +131,6 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架有如下
生成的文件结构如下:
```
.
├── greet
│   ├── etc
│   │   └── greet-api.json // 配置文件
@@ -141,26 +148,24 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架有如下
│   └── types
│   └── types.go // 请求、返回等类型定义
└── greet.api // api描述文件
8 directories, 9 files
```
生成的代码可以直接运行:
```shell
cd greet
go run greet.go -f etc/greet-api.json
```
```
默认侦听在8888端口可以在配置文件里修改可以通过curl请求
```shell
➜ go-zero git:(master) curl -w "\ncode: %{http_code}\n" http://localhost:8888/greet/from/kevin
{"code":0}
code: 200
```
```
编写业务代码:
* 可以在servicecontext.go里面传递依赖给logic比如mysql, redis等
* 在api定义的get/post/put/delete等请求对应的logic里增加业务处理逻辑
@@ -172,6 +177,17 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架有如下
...
```
### 微信交流群
## 7. Benchmark
![benchmark](doc/images/benchmark.png)
[测试代码见这里](https://github.com/smallnest/go-web-framework-benchmark)
## 8. 文档 (逐步完善中)
* [goctl使用帮助](doc/goctl.md)
* [关键字替换和敏感词过滤工具](doc/keywords.md)
## 9. 微信交流群
添加我的微信kevwan请注明go-zero我拉进go-zero社区群🤝

214
rest/engine.go Normal file
View File

@@ -0,0 +1,214 @@
package rest
import (
"errors"
"fmt"
"net/http"
"time"
"github.com/justinas/alice"
"github.com/tal-tech/go-zero/core/codec"
"github.com/tal-tech/go-zero/core/load"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/rest/handler"
"github.com/tal-tech/go-zero/rest/httpx"
"github.com/tal-tech/go-zero/rest/internal"
"github.com/tal-tech/go-zero/rest/router"
)
// use 1000m to represent 100%
const topCpuUsage = 1000
var ErrSignatureConfig = errors.New("bad config for Signature")
type engine struct {
conf RestConf
routes []featuredRoutes
unauthorizedCallback handler.UnauthorizedCallback
unsignedCallback handler.UnsignedCallback
middlewares []Middleware
shedder load.Shedder
priorityShedder load.Shedder
}
func newEngine(c RestConf) *engine {
srv := &engine{
conf: c,
}
if c.CpuThreshold > 0 {
srv.shedder = load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
srv.priorityShedder = load.NewAdaptiveShedder(load.WithCpuThreshold(
(c.CpuThreshold + topCpuUsage) >> 1))
}
return srv
}
func (s *engine) AddRoutes(r featuredRoutes) {
s.routes = append(s.routes, r)
}
func (s *engine) SetUnauthorizedCallback(callback handler.UnauthorizedCallback) {
s.unauthorizedCallback = callback
}
func (s *engine) SetUnsignedCallback(callback handler.UnsignedCallback) {
s.unsignedCallback = callback
}
func (s *engine) Start() error {
return s.StartWithRouter(router.NewPatRouter())
}
func (s *engine) StartWithRouter(router httpx.Router) error {
if err := s.bindRoutes(router); err != nil {
return err
}
return internal.StartHttp(s.conf.Host, s.conf.Port, router)
}
func (s *engine) appendAuthHandler(fr featuredRoutes, chain alice.Chain,
verifier func(alice.Chain) alice.Chain) alice.Chain {
if fr.jwt.enabled {
if len(fr.jwt.prevSecret) == 0 {
chain = chain.Append(handler.Authorize(fr.jwt.secret,
handler.WithUnauthorizedCallback(s.unauthorizedCallback)))
} else {
chain = chain.Append(handler.Authorize(fr.jwt.secret,
handler.WithPrevSecret(fr.jwt.prevSecret),
handler.WithUnauthorizedCallback(s.unauthorizedCallback)))
}
}
return verifier(chain)
}
func (s *engine) bindFeaturedRoutes(router httpx.Router, fr featuredRoutes, metrics *stat.Metrics) error {
verifier, err := s.signatureVerifier(fr.signature)
if err != nil {
return err
}
for _, route := range fr.routes {
if err := s.bindRoute(fr, router, metrics, route, verifier); err != nil {
return err
}
}
return nil
}
func (s *engine) bindRoute(fr featuredRoutes, router httpx.Router, metrics *stat.Metrics,
route Route, verifier func(chain alice.Chain) alice.Chain) error {
chain := alice.New(
handler.TracingHandler,
s.getLogHandler(),
handler.MaxConns(s.conf.MaxConns),
handler.BreakerHandler(route.Method, route.Path, metrics),
handler.SheddingHandler(s.getShedder(fr.priority), metrics),
handler.TimeoutHandler(time.Duration(s.conf.Timeout)*time.Millisecond),
handler.RecoverHandler,
handler.MetricHandler(metrics),
handler.PromMetricHandler(route.Path),
handler.MaxBytesHandler(s.conf.MaxBytes),
handler.GunzipHandler,
)
chain = s.appendAuthHandler(fr, chain, verifier)
for _, middleware := range s.middlewares {
chain = chain.Append(convertMiddleware(middleware))
}
handle := chain.ThenFunc(route.Handler)
return router.Handle(route.Method, route.Path, handle)
}
func (s *engine) bindRoutes(router httpx.Router) error {
metrics := s.createMetrics()
for _, fr := range s.routes {
if err := s.bindFeaturedRoutes(router, fr, metrics); err != nil {
return err
}
}
return nil
}
func (s *engine) createMetrics() *stat.Metrics {
var metrics *stat.Metrics
if len(s.conf.Name) > 0 {
metrics = stat.NewMetrics(s.conf.Name)
} else {
metrics = stat.NewMetrics(fmt.Sprintf("%s:%d", s.conf.Host, s.conf.Port))
}
return metrics
}
func (s *engine) getLogHandler() func(http.Handler) http.Handler {
if s.conf.Verbose {
return handler.DetailedLogHandler
} else {
return handler.LogHandler
}
}
func (s *engine) getShedder(priority bool) load.Shedder {
if priority && s.priorityShedder != nil {
return s.priorityShedder
}
return s.shedder
}
func (s *engine) signatureVerifier(signature signatureSetting) (func(chain alice.Chain) alice.Chain, error) {
if !signature.enabled {
return func(chain alice.Chain) alice.Chain {
return chain
}, nil
}
if len(signature.PrivateKeys) == 0 {
if signature.Strict {
return nil, ErrSignatureConfig
} else {
return func(chain alice.Chain) alice.Chain {
return chain
}, nil
}
}
decrypters := make(map[string]codec.RsaDecrypter)
for _, key := range signature.PrivateKeys {
fingerprint := key.Fingerprint
file := key.KeyFile
decrypter, err := codec.NewRsaDecrypter(file)
if err != nil {
return nil, err
}
decrypters[fingerprint] = decrypter
}
return func(chain alice.Chain) alice.Chain {
if s.unsignedCallback != nil {
return chain.Append(handler.ContentSecurityHandler(
decrypters, signature.Expiry, signature.Strict, s.unsignedCallback))
} else {
return chain.Append(handler.ContentSecurityHandler(
decrypters, signature.Expiry, signature.Strict))
}
}, nil
}
func (s *engine) use(middleware Middleware) {
s.middlewares = append(s.middlewares, middleware)
}
func convertMiddleware(ware Middleware) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return ware(next.ServeHTTP)
}
}

View File

@@ -217,6 +217,7 @@ func TestContentSecurityHandler(t *testing.T) {
signature: test.signature,
}
req, err := buildRequest(setting)
assert.Nil(t, err)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, test.statusCode, resp.Code)
@@ -249,6 +250,7 @@ func TestContentSecurityHandler_UnsignedCallback(t *testing.T) {
signature: "badone",
}
req, err := buildRequest(setting)
assert.Nil(t, err)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)
@@ -285,6 +287,7 @@ func TestContentSecurityHandler_UnsignedCallback_WrongTime(t *testing.T) {
fingerprint: fingerprint,
}
req, err := buildRequest(setting)
assert.Nil(t, err)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)

View File

@@ -1,6 +1,7 @@
package httpx
import (
"errors"
"net/http"
"strings"
"testing"
@@ -17,6 +18,24 @@ func init() {
logx.Disable()
}
func TestError(t *testing.T) {
const body = "foo"
w := tracedResponseWriter{
headers: make(map[string][]string),
}
Error(&w, errors.New(body))
assert.Equal(t, http.StatusBadRequest, w.code)
assert.Equal(t, body, strings.TrimSpace(w.builder.String()))
}
func TestOk(t *testing.T) {
w := tracedResponseWriter{
headers: make(map[string][]string),
}
Ok(&w)
assert.Equal(t, http.StatusOK, w.code)
}
func TestOkJson(t *testing.T) {
w := tracedResponseWriter{
headers: make(map[string][]string),

View File

@@ -1,170 +0,0 @@
package rest
import (
"log"
"net/http"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/rest/handler"
"github.com/tal-tech/go-zero/rest/httpx"
)
type (
runOptions struct {
start func(*engine) error
}
RunOption func(*Server)
Server struct {
ngin *engine
opts runOptions
}
)
func MustNewServer(c RestConf, opts ...RunOption) *Server {
engine, err := NewServer(c, opts...)
if err != nil {
log.Fatal(err)
}
return engine
}
func NewServer(c RestConf, opts ...RunOption) (*Server, error) {
if err := c.SetUp(); err != nil {
return nil, err
}
server := &Server{
ngin: newEngine(c),
opts: runOptions{
start: func(srv *engine) error {
return srv.Start()
},
},
}
for _, opt := range opts {
opt(server)
}
return server, nil
}
func (e *Server) AddRoutes(rs []Route, opts ...RouteOption) {
r := featuredRoutes{
routes: rs,
}
for _, opt := range opts {
opt(&r)
}
e.ngin.AddRoutes(r)
}
func (e *Server) AddRoute(r Route, opts ...RouteOption) {
e.AddRoutes([]Route{r}, opts...)
}
func (e *Server) Start() {
handleError(e.opts.start(e.ngin))
}
func (e *Server) Stop() {
logx.Close()
}
func (e *Server) Use(middleware Middleware) {
e.ngin.use(middleware)
}
func ToMiddleware(handler func(next http.Handler) http.Handler) Middleware {
return func(handle http.HandlerFunc) http.HandlerFunc {
return handler(handle).ServeHTTP
}
}
func WithJwt(secret string) RouteOption {
return func(r *featuredRoutes) {
validateSecret(secret)
r.jwt.enabled = true
r.jwt.secret = secret
}
}
func WithJwtTransition(secret, prevSecret string) RouteOption {
return func(r *featuredRoutes) {
// why not validate prevSecret, because prevSecret is an already used one,
// even it not meet our requirement, we still need to allow the transition.
validateSecret(secret)
r.jwt.enabled = true
r.jwt.secret = secret
r.jwt.prevSecret = prevSecret
}
}
func WithMiddleware(middleware Middleware, rs ...Route) []Route {
routes := make([]Route, len(rs))
for i := range rs {
route := rs[i]
routes[i] = Route{
Method: route.Method,
Path: route.Path,
Handler: middleware(route.Handler),
}
}
return routes
}
func WithPriority() RouteOption {
return func(r *featuredRoutes) {
r.priority = true
}
}
func WithRouter(router httpx.Router) RunOption {
return func(server *Server) {
server.opts.start = func(srv *engine) error {
return srv.StartWithRouter(router)
}
}
}
func WithSignature(signature SignatureConf) RouteOption {
return func(r *featuredRoutes) {
r.signature.enabled = true
r.signature.Strict = signature.Strict
r.signature.Expiry = signature.Expiry
r.signature.PrivateKeys = signature.PrivateKeys
}
}
func WithUnauthorizedCallback(callback handler.UnauthorizedCallback) RunOption {
return func(engine *Server) {
engine.ngin.SetUnauthorizedCallback(callback)
}
}
func WithUnsignedCallback(callback handler.UnsignedCallback) RunOption {
return func(engine *Server) {
engine.ngin.SetUnsignedCallback(callback)
}
}
func handleError(err error) {
// ErrServerClosed means the server is closed manually
if err == nil || err == http.ErrServerClosed {
return
}
logx.Error(err)
panic(err)
}
func validateSecret(secret string) {
if len(secret) < 8 {
panic("secret's length can't be less than 8")
}
}

View File

@@ -1,214 +1,170 @@
package rest
import (
"errors"
"fmt"
"log"
"net/http"
"time"
"github.com/justinas/alice"
"github.com/tal-tech/go-zero/core/codec"
"github.com/tal-tech/go-zero/core/load"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/rest/handler"
"github.com/tal-tech/go-zero/rest/httpx"
"github.com/tal-tech/go-zero/rest/internal"
"github.com/tal-tech/go-zero/rest/router"
)
// use 1000m to represent 100%
const topCpuUsage = 1000
var ErrSignatureConfig = errors.New("bad config for Signature")
type engine struct {
conf RestConf
routes []featuredRoutes
unauthorizedCallback handler.UnauthorizedCallback
unsignedCallback handler.UnsignedCallback
middlewares []Middleware
shedder load.Shedder
priorityShedder load.Shedder
}
func newEngine(c RestConf) *engine {
srv := &engine{
conf: c,
}
if c.CpuThreshold > 0 {
srv.shedder = load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
srv.priorityShedder = load.NewAdaptiveShedder(load.WithCpuThreshold(
(c.CpuThreshold + topCpuUsage) >> 1))
type (
runOptions struct {
start func(*engine) error
}
return srv
}
RunOption func(*Server)
func (s *engine) AddRoutes(r featuredRoutes) {
s.routes = append(s.routes, r)
}
func (s *engine) SetUnauthorizedCallback(callback handler.UnauthorizedCallback) {
s.unauthorizedCallback = callback
}
func (s *engine) SetUnsignedCallback(callback handler.UnsignedCallback) {
s.unsignedCallback = callback
}
func (s *engine) Start() error {
return s.StartWithRouter(router.NewPatRouter())
}
func (s *engine) StartWithRouter(router httpx.Router) error {
if err := s.bindRoutes(router); err != nil {
return err
Server struct {
ngin *engine
opts runOptions
}
)
return internal.StartHttp(s.conf.Host, s.conf.Port, router)
}
func (s *engine) appendAuthHandler(fr featuredRoutes, chain alice.Chain,
verifier func(alice.Chain) alice.Chain) alice.Chain {
if fr.jwt.enabled {
if len(fr.jwt.prevSecret) == 0 {
chain = chain.Append(handler.Authorize(fr.jwt.secret,
handler.WithUnauthorizedCallback(s.unauthorizedCallback)))
} else {
chain = chain.Append(handler.Authorize(fr.jwt.secret,
handler.WithPrevSecret(fr.jwt.prevSecret),
handler.WithUnauthorizedCallback(s.unauthorizedCallback)))
}
}
return verifier(chain)
}
func (s *engine) bindFeaturedRoutes(router httpx.Router, fr featuredRoutes, metrics *stat.Metrics) error {
verifier, err := s.signatureVerifier(fr.signature)
func MustNewServer(c RestConf, opts ...RunOption) *Server {
engine, err := NewServer(c, opts...)
if err != nil {
return err
log.Fatal(err)
}
for _, route := range fr.routes {
if err := s.bindRoute(fr, router, metrics, route, verifier); err != nil {
return err
return engine
}
func NewServer(c RestConf, opts ...RunOption) (*Server, error) {
if err := c.SetUp(); err != nil {
return nil, err
}
server := &Server{
ngin: newEngine(c),
opts: runOptions{
start: func(srv *engine) error {
return srv.Start()
},
},
}
for _, opt := range opts {
opt(server)
}
return server, nil
}
func (e *Server) AddRoutes(rs []Route, opts ...RouteOption) {
r := featuredRoutes{
routes: rs,
}
for _, opt := range opts {
opt(&r)
}
e.ngin.AddRoutes(r)
}
func (e *Server) AddRoute(r Route, opts ...RouteOption) {
e.AddRoutes([]Route{r}, opts...)
}
func (e *Server) Start() {
handleError(e.opts.start(e.ngin))
}
func (e *Server) Stop() {
logx.Close()
}
func (e *Server) Use(middleware Middleware) {
e.ngin.use(middleware)
}
func ToMiddleware(handler func(next http.Handler) http.Handler) Middleware {
return func(handle http.HandlerFunc) http.HandlerFunc {
return handler(handle).ServeHTTP
}
}
func WithJwt(secret string) RouteOption {
return func(r *featuredRoutes) {
validateSecret(secret)
r.jwt.enabled = true
r.jwt.secret = secret
}
}
func WithJwtTransition(secret, prevSecret string) RouteOption {
return func(r *featuredRoutes) {
// why not validate prevSecret, because prevSecret is an already used one,
// even it not meet our requirement, we still need to allow the transition.
validateSecret(secret)
r.jwt.enabled = true
r.jwt.secret = secret
r.jwt.prevSecret = prevSecret
}
}
func WithMiddleware(middleware Middleware, rs ...Route) []Route {
routes := make([]Route, len(rs))
for i := range rs {
route := rs[i]
routes[i] = Route{
Method: route.Method,
Path: route.Path,
Handler: middleware(route.Handler),
}
}
return nil
return routes
}
func (s *engine) bindRoute(fr featuredRoutes, router httpx.Router, metrics *stat.Metrics,
route Route, verifier func(chain alice.Chain) alice.Chain) error {
chain := alice.New(
handler.TracingHandler,
s.getLogHandler(),
handler.MaxConns(s.conf.MaxConns),
handler.BreakerHandler(route.Method, route.Path, metrics),
handler.SheddingHandler(s.getShedder(fr.priority), metrics),
handler.TimeoutHandler(time.Duration(s.conf.Timeout)*time.Millisecond),
handler.RecoverHandler,
handler.MetricHandler(metrics),
handler.PromMetricHandler(route.Path),
handler.MaxBytesHandler(s.conf.MaxBytes),
handler.GunzipHandler,
)
chain = s.appendAuthHandler(fr, chain, verifier)
for _, middleware := range s.middlewares {
chain = chain.Append(convertMiddleware(middleware))
func WithPriority() RouteOption {
return func(r *featuredRoutes) {
r.priority = true
}
handle := chain.ThenFunc(route.Handler)
return router.Handle(route.Method, route.Path, handle)
}
func (s *engine) bindRoutes(router httpx.Router) error {
metrics := s.createMetrics()
for _, fr := range s.routes {
if err := s.bindFeaturedRoutes(router, fr, metrics); err != nil {
return err
func WithRouter(router httpx.Router) RunOption {
return func(server *Server) {
server.opts.start = func(srv *engine) error {
return srv.StartWithRouter(router)
}
}
return nil
}
func (s *engine) createMetrics() *stat.Metrics {
var metrics *stat.Metrics
if len(s.conf.Name) > 0 {
metrics = stat.NewMetrics(s.conf.Name)
} else {
metrics = stat.NewMetrics(fmt.Sprintf("%s:%d", s.conf.Host, s.conf.Port))
}
return metrics
}
func (s *engine) getLogHandler() func(http.Handler) http.Handler {
if s.conf.Verbose {
return handler.DetailedLogHandler
} else {
return handler.LogHandler
func WithSignature(signature SignatureConf) RouteOption {
return func(r *featuredRoutes) {
r.signature.enabled = true
r.signature.Strict = signature.Strict
r.signature.Expiry = signature.Expiry
r.signature.PrivateKeys = signature.PrivateKeys
}
}
func (s *engine) getShedder(priority bool) load.Shedder {
if priority && s.priorityShedder != nil {
return s.priorityShedder
}
return s.shedder
}
func (s *engine) signatureVerifier(signature signatureSetting) (func(chain alice.Chain) alice.Chain, error) {
if !signature.enabled {
return func(chain alice.Chain) alice.Chain {
return chain
}, nil
}
if len(signature.PrivateKeys) == 0 {
if signature.Strict {
return nil, ErrSignatureConfig
} else {
return func(chain alice.Chain) alice.Chain {
return chain
}, nil
}
}
decrypters := make(map[string]codec.RsaDecrypter)
for _, key := range signature.PrivateKeys {
fingerprint := key.Fingerprint
file := key.KeyFile
decrypter, err := codec.NewRsaDecrypter(file)
if err != nil {
return nil, err
}
decrypters[fingerprint] = decrypter
}
return func(chain alice.Chain) alice.Chain {
if s.unsignedCallback != nil {
return chain.Append(handler.ContentSecurityHandler(
decrypters, signature.Expiry, signature.Strict, s.unsignedCallback))
} else {
return chain.Append(handler.ContentSecurityHandler(
decrypters, signature.Expiry, signature.Strict))
}
}, nil
}
func (s *engine) use(middleware Middleware) {
s.middlewares = append(s.middlewares, middleware)
}
func convertMiddleware(ware Middleware) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(ware(next.ServeHTTP))
func WithUnauthorizedCallback(callback handler.UnauthorizedCallback) RunOption {
return func(engine *Server) {
engine.ngin.SetUnauthorizedCallback(callback)
}
}
func WithUnsignedCallback(callback handler.UnsignedCallback) RunOption {
return func(engine *Server) {
engine.ngin.SetUnsignedCallback(callback)
}
}
func handleError(err error) {
// ErrServerClosed means the server is closed manually
if err == nil || err == http.ErrServerClosed {
return
}
logx.Error(err)
panic(err)
}
func validateSecret(secret string) {
if len(secret) < 8 {
panic("secret's length can't be less than 8")
}
}

View File

@@ -49,10 +49,10 @@ func NewClient(c RpcClientConf, options ...internal.ClientOption) (Client, error
var client Client
var err error
if len(c.Server) > 0 {
client, err = internal.NewDirectClient(c.Server, opts...)
if len(c.Endpoints) > 0 {
client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
} else if err = c.Etcd.Validate(); err == nil {
client, err = internal.NewDiscovClient(c.Etcd.Hosts, c.Etcd.Key, opts...)
client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
}
if err != nil {
return nil, err
@@ -64,7 +64,7 @@ func NewClient(c RpcClientConf, options ...internal.ClientOption) (Client, error
}
func NewClientNoAuth(c discov.EtcdConf) (Client, error) {
client, err := internal.NewDiscovClient(c.Hosts, c.Key)
client, err := internal.NewClient(internal.BuildDiscovTarget(c.Hosts, c.Key))
if err != nil {
return nil, err
}
@@ -74,6 +74,10 @@ func NewClientNoAuth(c discov.EtcdConf) (Client, error) {
}, nil
}
func NewClientWithTarget(target string, opts ...internal.ClientOption) (Client, error) {
return internal.NewClient(target, opts...)
}
func (rc *RpcClient) Conn() *grpc.ClientConn {
return rc.client.Conn()
}

View File

@@ -21,19 +21,19 @@ type (
}
RpcClientConf struct {
Etcd discov.EtcdConf `json:",optional"`
Server string `json:",optional=!Etcd"`
App string `json:",optional"`
Token string `json:",optional"`
Timeout int64 `json:",optional"`
Etcd discov.EtcdConf `json:",optional"`
Endpoints []string `json:",optional=!Etcd"`
App string `json:",optional"`
Token string `json:",optional"`
Timeout int64 `json:",optional"`
}
)
func NewDirectClientConf(server, app, token string) RpcClientConf {
func NewDirectClientConf(endpoints []string, app, token string) RpcClientConf {
return RpcClientConf{
Server: server,
App: app,
Token: token,
Endpoints: endpoints,
App: app,
Token: token,
}
}

View File

@@ -0,0 +1,62 @@
package auth
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"
)
func TestParseCredential(t *testing.T) {
tests := []struct {
name string
withNil bool
withEmptyMd bool
app string
token string
}{
{
name: "nil",
withNil: true,
},
{
name: "empty md",
withEmptyMd: true,
},
{
name: "empty",
},
{
name: "valid",
app: "foo",
token: "bar",
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
var ctx context.Context
if test.withNil {
ctx = context.Background()
} else if test.withEmptyMd {
ctx = metadata.NewIncomingContext(context.Background(), metadata.MD{})
} else {
md := metadata.New(map[string]string{
"app": test.app,
"token": test.token,
})
ctx = metadata.NewIncomingContext(context.Background(), md)
}
cred := ParseCredential(ctx)
assert.False(t, cred.RequireTransportSecurity())
m, err := cred.GetRequestMetadata(context.Background())
assert.Nil(t, err)
assert.Equal(t, test.app, m[appKey])
assert.Equal(t, test.token, m[tokenKey])
})
}
}

View File

@@ -21,7 +21,7 @@ import (
const (
Name = "p2c_ewma"
decayTime = int64(time.Millisecond * 600)
decayTime = int64(time.Second * 10) // default value from finagle
forcePick = int64(time.Second)
initSuccess = 1000
throttleSuccess = initSuccess / 2

View File

@@ -3,7 +3,9 @@ package p2c
import (
"context"
"fmt"
"runtime"
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@@ -33,19 +35,31 @@ func TestP2cPicker_Pick(t *testing.T) {
tests := []struct {
name string
candidates int
threshold float64
}{
{
name: "single",
candidates: 1,
threshold: 0.9,
},
{
name: "two",
candidates: 2,
threshold: 0.5,
},
{
name: "multiple",
candidates: 100,
threshold: 0.95,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
const total = 10000
builder := new(p2cPickerBuilder)
ready := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < test.candidates; i++ {
@@ -55,7 +69,9 @@ func TestP2cPicker_Pick(t *testing.T) {
}
picker := builder.Build(ready)
for i := 0; i < 10000; i++ {
var wg sync.WaitGroup
wg.Add(total)
for i := 0; i < total; i++ {
_, done, err := picker.Pick(context.Background(), balancer.PickInfo{
FullMethodName: "/",
Ctx: context.Background(),
@@ -64,11 +80,16 @@ func TestP2cPicker_Pick(t *testing.T) {
if i%100 == 0 {
err = status.Error(codes.DeadlineExceeded, "deadline")
}
done(balancer.DoneInfo{
Err: err,
})
go func() {
runtime.Gosched()
done(balancer.DoneInfo{
Err: err,
})
wg.Done()
}()
}
wg.Wait()
dist := make(map[interface{}]int)
conns := picker.(*p2cPicker).conns
for _, conn := range conns {
@@ -76,7 +97,8 @@ func TestP2cPicker_Pick(t *testing.T) {
}
entropy := mathx.CalcEntropy(dist)
assert.True(t, entropy > .95, fmt.Sprintf("entropy is %f, less than .95", entropy))
assert.True(t, entropy > test.threshold, fmt.Sprintf("entropy is %f, less than %f",
entropy, test.threshold))
})
}
}

View File

@@ -0,0 +1,47 @@
package internal
import (
"context"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestWithStreamClientInterceptors(t *testing.T) {
opts := WithStreamClientInterceptors()
assert.NotNil(t, opts)
}
func TestWithUnaryClientInterceptors(t *testing.T) {
opts := WithUnaryClientInterceptors()
assert.NotNil(t, opts)
}
func TestChainStreamClientInterceptors_zero(t *testing.T) {
interceptors := chainStreamClientInterceptors()
_, err := interceptors(context.Background(), nil, new(grpc.ClientConn), "/foo",
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, nil
})
assert.Nil(t, err)
}
func TestChainStreamClientInterceptors_one(t *testing.T) {
var called int32
interceptors := chainStreamClientInterceptors(func(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (
grpc.ClientStream, error) {
atomic.AddInt32(&called, 1)
return nil, nil
})
_, err := interceptors(context.Background(), nil, new(grpc.ClientConn), "/foo",
func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, nil
})
assert.Nil(t, err)
assert.Equal(t, int32(1), atomic.LoadInt32(&called))
}

View File

@@ -5,12 +5,18 @@ import (
"fmt"
"time"
"github.com/tal-tech/go-zero/rpcx/internal/balancer/p2c"
"github.com/tal-tech/go-zero/rpcx/internal/clientinterceptors"
"github.com/tal-tech/go-zero/rpcx/internal/resolver"
"google.golang.org/grpc"
)
const dialTimeout = time.Second * 3
func init() {
resolver.RegisterResolver()
}
type (
ClientOptions struct {
Timeout time.Duration
@@ -18,8 +24,26 @@ type (
}
ClientOption func(options *ClientOptions)
client struct {
conn *grpc.ClientConn
}
)
func NewClient(target string, opts ...ClientOption) (*client, error) {
opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
conn, err := dial(target, opts...)
if err != nil {
return nil, err
}
return &client{conn: conn}, nil
}
func (c *client) Conn() *grpc.ClientConn {
return c.conn
}
func WithDialOption(opt grpc.DialOption) ClientOption {
return func(options *ClientOptions) {
options.DialOptions = append(options.DialOptions, opt)

View File

@@ -1,12 +1,15 @@
package clientinterceptors
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/stat"
rcodes "github.com/tal-tech/go-zero/rpcx/internal/codes"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -49,3 +52,30 @@ func TestBreakerInterceptorDeadlineExceeded(t *testing.T) {
assert.True(t, errs[err] > 0)
assert.True(t, errs[breaker.ErrServiceUnavailable] > 0)
}
func TestBreakerInterceptor(t *testing.T) {
tests := []struct {
name string
err error
}{
{
name: "nil",
err: nil,
},
{
name: "with error",
err: errors.New("mock"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cc := new(grpc.ClientConn)
err := BreakerInterceptor(context.Background(), "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
return test.err
})
assert.Equal(t, test.err, err)
})
}
}

View File

@@ -0,0 +1,37 @@
package clientinterceptors
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestDurationInterceptor(t *testing.T) {
tests := []struct {
name string
err error
}{
{
name: "nil",
err: nil,
},
{
name: "with error",
err: errors.New("mock"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cc := new(grpc.ClientConn)
err := DurationInterceptor(context.Background(), "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
return test.err
})
assert.Equal(t, test.err, err)
})
}
}

View File

@@ -0,0 +1,37 @@
package clientinterceptors
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestPromMetricInterceptor(t *testing.T) {
tests := []struct {
name string
err error
}{
{
name: "nil",
err: nil,
},
{
name: "with error",
err: errors.New("mock"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cc := new(grpc.ClientConn)
err := PromMetricInterceptor(context.Background(), "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
return test.err
})
assert.Equal(t, test.err, err)
})
}
}

View File

@@ -0,0 +1,50 @@
package clientinterceptors
import (
"context"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestTimeoutInterceptor(t *testing.T) {
timeouts := []time.Duration{0, time.Millisecond * 10}
for _, timeout := range timeouts {
t.Run(strconv.FormatInt(int64(timeout), 10), func(t *testing.T) {
interceptor := TimeoutInterceptor(timeout)
cc := new(grpc.ClientConn)
err := interceptor(context.Background(), "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
return nil
},
)
assert.Nil(t, err)
})
}
}
func TestTimeoutInterceptor_timeout(t *testing.T) {
const timeout = time.Millisecond * 10
interceptor := TimeoutInterceptor(timeout)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
cc := new(grpc.ClientConn)
err := interceptor(ctx, "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
defer wg.Done()
tm, ok := ctx.Deadline()
assert.True(t, ok)
assert.True(t, tm.Before(time.Now().Add(timeout+time.Millisecond)))
return nil
})
wg.Wait()
assert.Nil(t, err)
}

View File

@@ -0,0 +1,53 @@
package clientinterceptors
import (
"context"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestTracingInterceptor(t *testing.T) {
var run int32
var wg sync.WaitGroup
wg.Add(1)
cc := new(grpc.ClientConn)
err := TracingInterceptor(context.Background(), "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
defer wg.Done()
atomic.AddInt32(&run, 1)
return nil
})
wg.Wait()
assert.Nil(t, err)
assert.Equal(t, int32(1), atomic.LoadInt32(&run))
}
func TestTracingInterceptor_GrpcFormat(t *testing.T) {
var run int32
var wg sync.WaitGroup
wg.Add(1)
md := metadata.New(map[string]string{
"foo": "bar",
})
carrier, err := trace.Inject(trace.GrpcFormat, md)
assert.Nil(t, err)
ctx, _ := trace.StartServerSpan(context.Background(), carrier, "user", "/foo")
cc := new(grpc.ClientConn)
err = TracingInterceptor(ctx, "/foo", nil, nil, cc,
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
defer wg.Done()
atomic.AddInt32(&run, 1)
return nil
})
wg.Wait()
assert.Nil(t, err)
assert.Equal(t, int32(1), atomic.LoadInt32(&run))
}

View File

@@ -1,26 +0,0 @@
package internal
import (
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
)
type DirectClient struct {
conn *grpc.ClientConn
}
func NewDirectClient(server string, opts ...ClientOption) (*DirectClient, error) {
opts = append(opts, WithDialOption(grpc.WithBalancerName(roundrobin.Name)))
conn, err := dial(server, opts...)
if err != nil {
return nil, err
}
return &DirectClient{
conn: conn,
}, nil
}
func (c *DirectClient) Conn() *grpc.ClientConn {
return c.conn
}

View File

@@ -1,34 +0,0 @@
package internal
import (
"fmt"
"strings"
"github.com/tal-tech/go-zero/rpcx/internal/balancer/p2c"
"github.com/tal-tech/go-zero/rpcx/internal/resolver"
"google.golang.org/grpc"
)
func init() {
resolver.RegisterResolver()
}
type DiscovClient struct {
conn *grpc.ClientConn
}
func NewDiscovClient(endpoints []string, key string, opts ...ClientOption) (*DiscovClient, error) {
opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
target := fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
strings.Join(endpoints, resolver.EndpointSep), key)
conn, err := dial(target, opts...)
if err != nil {
return nil, err
}
return &DiscovClient{conn: conn}, nil
}
func (c *DiscovClient) Conn() *grpc.ClientConn {
return c.conn
}

View File

@@ -0,0 +1,32 @@
package resolver
import (
"strings"
"google.golang.org/grpc/resolver"
)
type directBuilder struct{}
func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
var addrs []resolver.Address
endpoints := strings.FieldsFunc(target.Endpoint, func(r rune) bool {
return r == EndpointSepChar
})
for _, val := range subset(endpoints, subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
return &nopResolver{cc: cc}, nil
}
func (d *directBuilder) Scheme() string {
return DirectScheme
}

View File

@@ -0,0 +1,52 @@
package resolver
import (
"fmt"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/mathx"
"google.golang.org/grpc/resolver"
)
func TestDirectBuilder_Build(t *testing.T) {
tests := []int{
0,
1,
2,
subsetSize / 2,
subsetSize,
subsetSize * 2,
}
for _, test := range tests {
t.Run(strconv.Itoa(test), func(t *testing.T) {
var servers []string
for i := 0; i < test; i++ {
servers = append(servers, fmt.Sprintf("localhost:%d", i))
}
var b directBuilder
cc := new(mockedClientConn)
_, err := b.Build(resolver.Target{
Scheme: DirectScheme,
Endpoint: strings.Join(servers, ","),
}, cc, resolver.BuildOptions{})
assert.Nil(t, err)
size := mathx.MinInt(test, subsetSize)
assert.Equal(t, size, len(cc.state.Addresses))
m := make(map[string]lang.PlaceholderType)
for _, each := range cc.state.Addresses {
m[each.Addr] = lang.Placeholder
}
assert.Equal(t, size, len(m))
})
}
}
func TestDirectBuilder_Scheme(t *testing.T) {
var b directBuilder
assert.Equal(t, DirectScheme, b.Scheme())
}

View File

@@ -0,0 +1,41 @@
package resolver
import (
"strings"
"github.com/tal-tech/go-zero/core/discov"
"google.golang.org/grpc/resolver"
)
type discovBuilder struct{}
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
sub.AddListener(update)
update()
return &nopResolver{cc: cc}, nil
}
func (d *discovBuilder) Scheme() string {
return DiscovScheme
}

View File

@@ -2,67 +2,34 @@ package resolver
import (
"fmt"
"strings"
"github.com/tal-tech/go-zero/core/discov"
"google.golang.org/grpc/resolver"
)
const (
DiscovScheme = "discov"
EndpointSep = ","
subsetSize = 32
DirectScheme = "direct"
DiscovScheme = "discov"
EndpointSepChar = ','
subsetSize = 32
)
var builder discovBuilder
var (
EndpointSep = fmt.Sprintf("%c", EndpointSepChar)
dirBuilder directBuilder
disBuilder discovBuilder
)
type discovBuilder struct{}
func (b *discovBuilder) Scheme() string {
return DiscovScheme
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
}
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
if target.Scheme != DiscovScheme {
return nil, fmt.Errorf("bad scheme: %s", target.Scheme)
}
hosts := strings.Split(target.Authority, EndpointSep)
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
sub.AddListener(update)
update()
return &discovResolver{
cc: cc,
}, nil
}
type discovResolver struct {
type nopResolver struct {
cc resolver.ClientConn
}
func (r *discovResolver) Close() {
func (r *nopResolver) Close() {
}
func (r *discovResolver) ResolveNow(options resolver.ResolveNowOptions) {
}
func RegisterResolver() {
resolver.Register(&builder)
func (r *nopResolver) ResolveNow(options resolver.ResolveNowOptions) {
}

View File

@@ -0,0 +1,36 @@
package resolver
import (
"testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
func TestNopResolver(t *testing.T) {
// make sure ResolveNow & Close don't panic
var r nopResolver
r.ResolveNow(resolver.ResolveNowOptions{})
r.Close()
}
type mockedClientConn struct {
state resolver.State
}
func (m *mockedClientConn) UpdateState(state resolver.State) {
m.state = state
}
func (m *mockedClientConn) ReportError(err error) {
}
func (m *mockedClientConn) NewAddress(addresses []resolver.Address) {
}
func (m *mockedClientConn) NewServiceConfig(serviceConfig string) {
}
func (m *mockedClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return nil
}

View File

@@ -0,0 +1,200 @@
package serverinterceptors
import (
"context"
"testing"
"github.com/alicebob/miniredis"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/rpcx/internal/auth"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestStreamAuthorizeInterceptor(t *testing.T) {
tests := []struct {
name string
app string
token string
strict bool
hasError bool
}{
{
name: "strict=false",
strict: false,
hasError: false,
},
{
name: "strict=true",
strict: true,
hasError: true,
},
{
name: "strict=true,with token",
app: "foo",
token: "bar",
strict: true,
hasError: false,
},
{
name: "strict=true,with error token",
app: "foo",
token: "error",
strict: true,
hasError: true,
},
}
r := miniredis.NewMiniRedis()
assert.Nil(t, r.Start())
defer r.Close()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := redis.NewRedis(r.Addr(), redis.NodeType)
if len(test.app) > 0 {
assert.Nil(t, store.Hset("apps", test.app, test.token))
defer store.Hdel("apps", test.app)
}
authenticator, err := auth.NewAuthenticator(store, "apps", test.strict)
assert.Nil(t, err)
interceptor := StreamAuthorizeInterceptor(authenticator)
md := metadata.New(map[string]string{
"app": "foo",
"token": "bar",
})
ctx := metadata.NewIncomingContext(context.Background(), md)
stream := mockedStream{ctx: ctx}
err = interceptor(nil, stream, nil, func(srv interface{}, stream grpc.ServerStream) error {
return nil
})
if test.hasError {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
}
})
}
}
func TestUnaryAuthorizeInterceptor(t *testing.T) {
tests := []struct {
name string
app string
token string
strict bool
hasError bool
}{
{
name: "strict=false",
strict: false,
hasError: false,
},
{
name: "strict=true",
strict: true,
hasError: true,
},
{
name: "strict=true,with token",
app: "foo",
token: "bar",
strict: true,
hasError: false,
},
{
name: "strict=true,with error token",
app: "foo",
token: "error",
strict: true,
hasError: true,
},
}
r := miniredis.NewMiniRedis()
assert.Nil(t, r.Start())
defer r.Close()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := redis.NewRedis(r.Addr(), redis.NodeType)
if len(test.app) > 0 {
assert.Nil(t, store.Hset("apps", test.app, test.token))
defer store.Hdel("apps", test.app)
}
authenticator, err := auth.NewAuthenticator(store, "apps", test.strict)
assert.Nil(t, err)
interceptor := UnaryAuthorizeInterceptor(authenticator)
md := metadata.New(map[string]string{
"app": "foo",
"token": "bar",
})
ctx := metadata.NewIncomingContext(context.Background(), md)
_, err = interceptor(ctx, nil, nil,
func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})
if test.hasError {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
}
if test.strict {
_, err = interceptor(context.Background(), nil, nil,
func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})
assert.NotNil(t, err)
var md metadata.MD
ctx := metadata.NewIncomingContext(context.Background(), md)
_, err = interceptor(ctx, nil, nil,
func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})
assert.NotNil(t, err)
md = metadata.New(map[string]string{
"app": "",
"token": "",
})
ctx = metadata.NewIncomingContext(context.Background(), md)
_, err = interceptor(ctx, nil, nil,
func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})
assert.NotNil(t, err)
}
})
}
}
type mockedStream struct {
ctx context.Context
}
func (m mockedStream) SetHeader(md metadata.MD) error {
return nil
}
func (m mockedStream) SendHeader(md metadata.MD) error {
return nil
}
func (m mockedStream) SetTrailer(md metadata.MD) {
}
func (m mockedStream) Context() context.Context {
return m.ctx
}
func (m mockedStream) SendMsg(v interface{}) error {
return nil
}
func (m mockedStream) RecvMsg(v interface{}) error {
return nil
}

View File

@@ -0,0 +1,31 @@
package serverinterceptors
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx"
"google.golang.org/grpc"
)
func init() {
logx.Disable()
}
func TestStreamCrashInterceptor(t *testing.T) {
err := StreamCrashInterceptor(nil, nil, nil, func(
srv interface{}, stream grpc.ServerStream) error {
panic("mock panic")
})
assert.NotNil(t, err)
}
func TestUnaryCrashInterceptor(t *testing.T) {
interceptor := UnaryCrashInterceptor()
_, err := interceptor(context.Background(), nil, nil,
func(ctx context.Context, req interface{}) (interface{}, error) {
panic("mock panic")
})
assert.NotNil(t, err)
}

View File

@@ -33,12 +33,12 @@ var (
)
func UnaryPromMetricInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
interface{}, error) {
startTime := timex.Now()
resp, err := handler(ctx, req)
metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
return resp, err
}
}

View File

@@ -0,0 +1,19 @@
package serverinterceptors
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestUnaryPromMetricInterceptor(t *testing.T) {
interceptor := UnaryPromMetricInterceptor()
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})
assert.Nil(t, err)
}

View File

@@ -0,0 +1,77 @@
package serverinterceptors
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/load"
"github.com/tal-tech/go-zero/core/stat"
"google.golang.org/grpc"
)
func TestUnarySheddingInterceptor(t *testing.T) {
tests := []struct {
name string
allow bool
handleErr error
expect error
}{
{
name: "allow",
allow: true,
handleErr: nil,
expect: nil,
},
{
name: "allow",
allow: true,
handleErr: context.DeadlineExceeded,
expect: context.DeadlineExceeded,
},
{
name: "reject",
allow: false,
handleErr: nil,
expect: load.ErrServiceOverloaded,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
shedder := mockedShedder{allow: test.allow}
metrics := stat.NewMetrics("mock")
interceptor := UnarySheddingInterceptor(shedder, metrics)
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, test.handleErr
})
assert.Equal(t, test.expect, err)
})
}
}
type mockedShedder struct {
allow bool
}
func (m mockedShedder) Allow() (load.Promise, error) {
if m.allow {
return mockedPromise{}, nil
} else {
return nil, load.ErrServiceOverloaded
}
}
type mockedPromise struct {
}
func (m mockedPromise) Pass() {
}
func (m mockedPromise) Fail() {
}

View File

@@ -0,0 +1,32 @@
package serverinterceptors
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stat"
"google.golang.org/grpc"
)
func TestUnaryStatInterceptor(t *testing.T) {
metrics := stat.NewMetrics("mock")
interceptor := UnaryStatInterceptor(metrics)
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})
assert.Nil(t, err)
}
func TestUnaryStatInterceptor_crash(t *testing.T) {
metrics := stat.NewMetrics("mock")
interceptor := UnaryStatInterceptor(metrics)
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
panic("error")
})
assert.NotNil(t, err)
}

View File

@@ -0,0 +1,41 @@
package serverinterceptors
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestUnaryTimeoutInterceptor(t *testing.T) {
interceptor := UnaryTimeoutInterceptor(time.Millisecond * 10)
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, nil
})
assert.Nil(t, err)
}
func TestUnaryTimeoutInterceptor_timeout(t *testing.T) {
const timeout = time.Millisecond * 10
interceptor := UnaryTimeoutInterceptor(timeout)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
defer wg.Done()
tm, ok := ctx.Deadline()
assert.True(t, ok)
assert.True(t, tm.Before(time.Now().Add(timeout+time.Millisecond)))
return nil, nil
})
wg.Wait()
assert.Nil(t, err)
}

View File

@@ -0,0 +1,48 @@
package serverinterceptors
import (
"context"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/trace/tracespec"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestUnaryTracingInterceptor(t *testing.T) {
interceptor := UnaryTracingInterceptor("foo")
var run int32
var wg sync.WaitGroup
wg.Add(1)
_, err := interceptor(context.Background(), nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
defer wg.Done()
atomic.AddInt32(&run, 1)
return nil, nil
})
wg.Wait()
assert.Nil(t, err)
assert.Equal(t, int32(1), atomic.LoadInt32(&run))
}
func TestUnaryTracingInterceptor_GrpcFormat(t *testing.T) {
interceptor := UnaryTracingInterceptor("foo")
var wg sync.WaitGroup
wg.Add(1)
var md metadata.MD
ctx := metadata.NewIncomingContext(context.Background(), md)
_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{
FullMethod: "/",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
defer wg.Done()
assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).TraceId()) > 0)
assert.True(t, len(ctx.Value(tracespec.TracingKey).(tracespec.Trace).SpanId()) > 0)
return nil, nil
})
wg.Wait()
assert.Nil(t, err)
}

18
rpcx/internal/target.go Normal file
View File

@@ -0,0 +1,18 @@
package internal
import (
"fmt"
"strings"
"github.com/tal-tech/go-zero/rpcx/internal/resolver"
)
func BuildDirectTarget(endpoints []string) string {
return fmt.Sprintf("%s:///%s", resolver.DirectScheme,
strings.Join(endpoints, resolver.EndpointSep))
}
func BuildDiscovTarget(endpoints []string, key string) string {
return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
strings.Join(endpoints, resolver.EndpointSep), key)
}

View File

@@ -0,0 +1,17 @@
package internal
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestBuildDirectTarget(t *testing.T) {
target := BuildDirectTarget([]string{"localhost:123", "localhost:456"})
assert.Equal(t, "direct:///localhost:123,localhost:456", target)
}
func TestBuildDiscovTarget(t *testing.T) {
target := BuildDiscovTarget([]string{"localhost:123", "localhost:456"}, "foo")
assert.Equal(t, "discov://localhost:123,localhost:456/foo", target)
}

View File

@@ -38,11 +38,11 @@ func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) {
return client, nil
}
client, err := NewClient(RpcClientConf{
Server: p.backend,
App: cred.App,
Token: cred.Token,
}, p.options...)
opts := append(p.options, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
App: cred.App,
Token: cred.Token,
})))
client, err := NewClientWithTarget(p.backend, opts...)
if err != nil {
return nil, err
}

View File

@@ -19,29 +19,24 @@ const apiTemplate = `info(
email: {{.gitEmail}}
)
type request struct{
type request struct {
// TODO: add members here and delete this comment
}
type response struct{
type response struct {
// TODO: add members here and delete this comment
}
@server(
port: // TODO: add port here and delete this comment
)
service {{.serviceName}} {
@server(
handler: // TODO: set handler name and delete this comment
)
// TODO: edit the below line
// get /users/id/:userId(request) returns(response)
get /users/id/:userId(request) returns(response)
@server(
handler: // TODO: set handler name and delete this comment
)
// TODO: edit the below line
// post /users/create(request)
post /users/create(request)
}
`

View File

@@ -4,7 +4,7 @@ import (
"errors"
"strings"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/urfave/cli"
)
@@ -32,8 +32,8 @@ func DartCommand(c *cli.Context) error {
dir = dir + "/"
}
api.Info.Title = strings.Replace(apiFile, ".api", "", -1)
lang.Must(genData(dir+"data/", api))
lang.Must(genApi(dir+"api/", api))
lang.Must(genVars(dir + "vars/"))
logx.Must(genData(dir+"data/", api))
logx.Must(genApi(dir+"api/", api))
logx.Must(genVars(dir + "vars/"))
return nil
}

View File

@@ -14,7 +14,7 @@ import (
"time"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
apiformat "github.com/tal-tech/go-zero/tools/goctl/api/format"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
apiutil "github.com/tal-tech/go-zero/tools/goctl/api/util"
@@ -45,15 +45,15 @@ func GoCommand(c *cli.Context) error {
return err
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genEtc(dir, api))
lang.Must(genConfig(dir))
lang.Must(genMain(dir, api))
lang.Must(genServiceContext(dir, api))
lang.Must(genTypes(dir, api))
lang.Must(genHandlers(dir, api))
lang.Must(genRoutes(dir, api))
lang.Must(genLogic(dir, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genEtc(dir, api))
logx.Must(genConfig(dir))
logx.Must(genMain(dir, api))
logx.Must(genServiceContext(dir, api))
logx.Must(genTypes(dir, api))
logx.Must(genHandlers(dir, api))
logx.Must(genRoutes(dir, api))
logx.Must(genLogic(dir, api))
// it does not work
format(dir)
createGoModFileIfNeed(dir)
@@ -148,7 +148,6 @@ func createGoModFileIfNeed(dir string) {
}
tempPath = filepath.Dir(tempPath)
if util.FileExists(filepath.Join(tempPath, goModeIdentifier)) {
tempPath = filepath.Dir(tempPath)
hasGoMod = true
break
}

View File

@@ -72,7 +72,7 @@ func genHandler(dir string, group spec.Group, route spec.Route) error {
req = ""
}
var logicResponse string
var writeResponse = "nil, nil"
var writeResponse string
var respWriter = `httpx.WriteJson(w, http.StatusOK, resp)`
if len(route.ResponseType.Name) > 0 {
logicResponse = "resp, err :="

View File

@@ -43,6 +43,7 @@ var mapping = map[string]string{
"head": "http.MethodHead",
"post": "http.MethodPost",
"put": "http.MethodPut",
"patch": "http.MethodPatch",
}
type (

View File

@@ -6,7 +6,7 @@ import (
"strings"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/tal-tech/go-zero/tools/goctl/util"
"github.com/urfave/cli"
@@ -36,9 +36,9 @@ func JavaCommand(c *cli.Context) error {
packetName = packetName[:len(packetName)-4]
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genPacket(dir, packetName, api))
lang.Must(genComponents(dir, packetName, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genPacket(dir, packetName, api))
logx.Must(genComponents(dir, packetName, api))
fmt.Println(aurora.Green("Done."))
return nil

View File

@@ -0,0 +1,42 @@
package ktgen
import (
"errors"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/urfave/cli"
)
func KtCommand(c *cli.Context) error {
apiFile := c.String("api")
if apiFile == "" {
return errors.New("missing -api")
}
dir := c.String("dir")
if dir == "" {
return errors.New("missing -dir")
}
pkg := c.String("pkg")
if pkg == "" {
return errors.New("missing -pkg")
}
p, e := parser.NewParser(apiFile)
if e != nil {
return e
}
api, e := p.Parse()
if e != nil {
return e
}
e = genBase(dir, pkg, api)
if e != nil {
return e
}
e = genApi(dir, pkg, api)
if e != nil {
return e
}
return nil
}

View File

@@ -0,0 +1,77 @@
package ktgen
import (
"log"
"strings"
"text/template"
"github.com/iancoleman/strcase"
"github.com/tal-tech/go-zero/tools/goctl/api/util"
)
var funcsMap = template.FuncMap{
"lowCamelCase": lowCamelCase,
"routeToFuncName": routeToFuncName,
"parseType": parseType,
"add": add,
"upperCase": upperCase,
}
func lowCamelCase(s string) string {
if len(s) < 1 {
return ""
}
s = util.ToCamelCase(util.ToSnakeCase(s))
return util.ToLower(s[:1]) + s[1:]
}
func routeToFuncName(method, path string) string {
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
path = strings.ReplaceAll(path, "/", "_")
path = strings.ReplaceAll(path, "-", "_")
path = strings.ReplaceAll(path, ":", "With_")
return strings.ToLower(method) + strcase.ToCamel(path)
}
func parseType(t string) string {
t = strings.Replace(t, "*", "", -1)
if strings.HasPrefix(t, "[]") {
return "List<" + parseType(t[2:]) + ">"
}
if strings.HasPrefix(t, "map") {
tys, e := util.DecomposeType(t)
if e != nil {
log.Fatal(e)
}
if len(tys) != 2 {
log.Fatal("Map type number !=2")
}
return "Map<String," + parseType(tys[1]) + ">"
}
switch t {
case "string":
return "String"
case "int", "int32", "int64":
return "Int"
case "float", "float32", "float64":
return "Double"
case "bool":
return "Boolean"
default:
return t
}
}
func add(a, i int) int {
return a + i
}
func upperCase(s string) string {
return strings.ToUpper(s)
}

View File

@@ -0,0 +1,150 @@
package ktgen
import (
"fmt"
"os"
"path/filepath"
"text/template"
"github.com/iancoleman/strcase"
"github.com/tal-tech/go-zero/tools/goctl/api/spec"
)
const (
apiBaseTemplate = `package {{.}}
import com.google.gson.Gson
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.io.BufferedReader
import java.io.InputStreamReader
import java.io.OutputStreamWriter
import java.net.HttpURLConnection
import java.net.URL
const val SERVER = "http://localhost:8080"
suspend fun apiRequest(
method: String,
uri: String,
body: Any = "",
onOk: ((String) -> Unit)? = null,
onFail: ((String) -> Unit)? = null,
eventually: (() -> Unit)? = null
) = withContext(Dispatchers.IO) {
val url = URL(SERVER + uri)
with(url.openConnection() as HttpURLConnection) {
connectTimeout = 3000
requestMethod = method
doInput = true
if (method == "POST" || method == "PUT" || method == "PATCH") {
setRequestProperty("Content-Type", "application/json")
doOutput = true
val data = when (body) {
is String -> {
body
}
else -> {
Gson().toJson(body)
}
}
val wr = OutputStreamWriter(outputStream)
wr.write(data)
wr.flush()
}
try {
if (responseCode >= 400) {
BufferedReader(InputStreamReader(errorStream)).use {
val response = it.readText()
onFail?.invoke(response)
}
return@with
}
//response
BufferedReader(InputStreamReader(inputStream)).use {
val response = it.readText()
onOk?.invoke(response)
}
} catch (e: Exception) {
e.message?.let { onFail?.invoke(it) }
}
}
eventually?.invoke()
}
`
apiTemplate = `package {{with .Info}}{{.Desc}}{{end}}
import com.google.gson.Gson
object {{with .Info}}{{.Title}}{{end}}{
{{range .Types}}
data class {{.Name}}({{$length := (len .Members)}}{{range $i,$item := .Members}}
val {{with $item}}{{lowCamelCase .Name}}: {{parseType .Type}}{{end}}{{if ne $i (add $length -1)}},{{end}}{{end}}
){{end}}
{{with .Service}}
{{range .Routes}}suspend fun {{routeToFuncName .Method .Path}}({{with .RequestType}}{{if ne .Name ""}}
req:{{.Name}},{{end}}{{end}}
onOk: (({{with .ResponseType}}{{.Name}}{{end}}) -> Unit)? = null,
onFail: ((String) -> Unit)? = null,
eventually: (() -> Unit)? = null
){
apiRequest("{{upperCase .Method}}","{{.Path}}",{{with .RequestType}}{{if ne .Name ""}}body=req,{{end}}{{end}} onOk = { {{with .ResponseType}}
onOk?.invoke({{if ne .Name ""}}Gson().fromJson(it,{{.Name}}::class.java){{end}}){{end}}
}, onFail = onFail, eventually =eventually)
}
{{end}}{{end}}
}`
)
func genBase(dir, pkg string, api *spec.ApiSpec) error {
e := os.MkdirAll(dir, 0755)
if e != nil {
return e
}
path := filepath.Join(dir, "BaseApi.kt")
if _, e := os.Stat(path); e == nil {
fmt.Println("BaseApi.kt already exists, skipped it.")
return nil
}
file, e := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if e != nil {
return e
}
defer file.Close()
t, e := template.New("n").Parse(apiBaseTemplate)
if e != nil {
return e
}
e = t.Execute(file, pkg)
if e != nil {
return e
}
return nil
}
func genApi(dir, pkg string, api *spec.ApiSpec) error {
name := strcase.ToCamel(api.Info.Title + "Api")
path := filepath.Join(dir, name+".kt")
api.Info.Title = name
api.Info.Desc = pkg
e := os.MkdirAll(dir, 0755)
if e != nil {
return e
}
file, e := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
if e != nil {
return e
}
defer file.Close()
t, e := template.New("api").Funcs(funcsMap).Parse(apiTemplate)
if e != nil {
return e
}
return t.Execute(file, api)
}

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"os"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
)
@@ -14,8 +14,8 @@ func main() {
}
p, err := parser.NewParser(os.Args[1])
lang.Must(err)
logx.Must(err)
api, err := p.Parse()
lang.Must(err)
logx.Must(err)
fmt.Println(api)
}

View File

@@ -99,8 +99,6 @@ func (s rootState) processToken(token string, annos []spec.Annotation) (state, e
switch token {
case infoDirective:
return newInfoState(s.baseState), nil
//case typeDirective:
//return newTypeState(s.baseState, annos), nil
case serviceDirective:
return newServiceState(s.baseState, annos), nil
default:

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/tal-tech/go-zero/tools/goctl/util"
"github.com/urfave/cli"
@@ -34,9 +34,9 @@ func TsCommand(c *cli.Context) error {
return err
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genHandler(dir, webApi, caller, api, unwrapApi))
lang.Must(genComponents(dir, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genHandler(dir, webApi, caller, api, unwrapApi))
logx.Must(genComponents(dir, api))
fmt.Println(aurora.Green("Done."))
return nil

View File

@@ -13,6 +13,10 @@ import (
func writeProperty(writer io.Writer, member spec.Member, indent int, prefixForType func(string) string) error {
writeIndent(writer, indent)
ty, err := goTypeToTs(member.Type, prefixForType)
if err != nil {
return err
}
optionalTag := ""
if member.IsOptional() || member.IsOmitempty() {
optionalTag = "?"
@@ -21,13 +25,14 @@ func writeProperty(writer io.Writer, member spec.Member, indent int, prefixForTy
if err != nil {
return err
}
comment := member.GetComment()
if len(comment) > 0 {
comment = strings.TrimPrefix(comment, "//")
comment = " // " + strings.TrimSpace(comment)
}
if len(member.Docs) > 0 {
_, err = fmt.Fprintf(writer, "%s\n", strings.Join(member.Docs, ""))
fmt.Fprintf(writer, "%s\n", strings.Join(member.Docs, ""))
writeIndent(writer, 1)
}
_, err = fmt.Fprintf(writer, "%s%s: %s%s\n", name, optionalTag, ty, comment)

Some files were not shown because too many files have changed in this diff Show More