Compare commits

..

39 Commits

Author SHA1 Message Date
kevin
8745039877 move lang.Must into logx.Must to make sure output fatal message as json 2020-08-14 15:08:06 +08:00
kevin
9d9399ad10 confirm addition after add called in periodical executor 2020-08-14 11:50:01 +08:00
kevin
e7dd04701c add more tests 2020-08-14 11:24:56 +08:00
kevin
a3d7474ae0 fix data race 2020-08-14 11:03:16 +08:00
kevin
6fdee77fa9 add queue package 2020-08-13 17:00:53 +08:00
kevin
5f084fb7d2 remove pdf 2020-08-13 12:37:39 +08:00
kevin
c8ff9d2f23 Merge branch 'kingxt-master' into master 2020-08-12 16:08:23 +08:00
kevin
78f5e7df87 update workflow 2020-08-12 16:08:15 +08:00
kevin
6d8dc4630f update readme 2020-08-12 15:25:08 +08:00
kevin
03ac41438f rename files 2020-08-12 15:25:08 +08:00
kevin
4ef0b0a8ac update readme 2020-08-12 15:21:47 +08:00
kevin
87b1fba46c rename files 2020-08-12 15:03:07 +08:00
kingxt
cfa6644b0c auto generate go mod if need 2020-08-12 15:02:56 +08:00
kevin
fcaebd73fb remove bodyless check 2020-08-12 15:02:04 +08:00
kevin
a26fc2b672 parse body only if content length > 0 2020-08-12 15:02:04 +08:00
kevin
05c8dd0b9c return ErrBodylessRequest on get method etc. 2020-08-12 15:02:04 +08:00
kevin
d6c7da521e remove bodyless check 2020-08-12 14:44:09 +08:00
kevin
96b6d2ab58 parse body only if content length > 0 2020-08-12 14:37:34 +08:00
kevin
88a73f1042 return ErrBodylessRequest on get method etc. 2020-08-12 14:31:11 +08:00
kevin
9428fface2 export httpx.GetRemoteAddr 2020-08-12 14:15:59 +08:00
kevin
c637f86817 export router 2020-08-12 14:15:59 +08:00
kevin
d4097af627 export token parser for refresh token service 2020-08-12 14:15:59 +08:00
kevin
7da31921c7 remove unused method 2020-08-12 14:15:59 +08:00
kim
47440964cd fix windows slash 2020-08-12 14:15:59 +08:00
kevin
80d55dbc02 export httpx.GetRemoteAddr 2020-08-12 12:25:52 +08:00
kevin
b541403ce2 export router 2020-08-12 12:16:36 +08:00
kevin
a7c02414f3 export token parser for refresh token service 2020-08-12 12:12:31 +08:00
kingxt
196475383b update readme.md 2020-08-12 11:05:00 +08:00
kingxt
fd75f700a2 fix windows bug 2020-08-12 11:05:00 +08:00
kevin
d408a0d49b remove unused method 2020-08-12 11:05:00 +08:00
kim
69d113a46d fix windows slash 2020-08-12 11:05:00 +08:00
kingxt
63c7f44a5f update readme.md 2020-08-12 10:50:17 +08:00
kingxt
19888b7d11 fix windows bug 2020-08-12 10:23:49 +08:00
kevin
d117e31993 use strings.Contains instead of strings.Index 2020-08-12 10:00:59 +08:00
kevin
10cd6053bc refactor rpcx, export WithDialOption and WithTimeout 2020-08-12 10:00:59 +08:00
kevin
d1529fced8 move auth interceptor into serverinterceptors 2020-08-12 10:00:59 +08:00
kevin
4f59fd306a use fmt.Println instead of println 2020-08-12 10:00:59 +08:00
kevin
f77c73eec1 remove unused method 2020-08-12 10:00:59 +08:00
kim
9b0b958f43 fix windows slash 2020-08-11 11:07:47 +08:00
56 changed files with 810 additions and 115 deletions

View File

@@ -25,10 +25,6 @@ jobs:
- name: Get dependencies
run: |
go get -v -t -d ./...
if [ -f Gopkg.toml ]; then
curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
dep ensure
fi
- name: Test
run: go test -v -race ./...

View File

@@ -2,7 +2,7 @@ package discov
import (
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
type (
@@ -26,7 +26,7 @@ func NewFacade(endpoints []string) Facade {
func (f Facade) Client() internal.EtcdClient {
conn, err := f.registry.GetConn(f.endpoints)
lang.Must(err)
logx.Must(err)
return conn
}

View File

@@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
@@ -32,19 +33,21 @@ type (
container TaskContainer
waitGroup sync.WaitGroup
// avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
wgBarrier syncx.Barrier
guarded bool
newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
wgBarrier syncx.Barrier
confirmChan chan lang.PlaceholderType
guarded bool
newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
}
)
func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
// buffer 1 to let the caller go quickly
commander: make(chan interface{}, 1),
interval: interval,
container: container,
commander: make(chan interface{}, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
},
@@ -59,10 +62,12 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per
func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
}
}
func (pe *PeriodicalExecutor) Flush() bool {
pe.enterExecution()
return pe.executeTasks(func() interface{} {
pe.lock.Lock()
defer pe.lock.Unlock()
@@ -114,6 +119,8 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
select {
case vals := <-pe.commander:
commanded = true
pe.enterExecution()
pe.confirmChan <- lang.Placeholder
pe.executeTasks(vals)
last = timex.Now()
case <-ticker.Chan():
@@ -135,13 +142,18 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
})
}
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
func (pe *PeriodicalExecutor) doneExecution() {
pe.waitGroup.Done()
}
func (pe *PeriodicalExecutor) enterExecution() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1)
})
defer pe.wgBarrier.Guard(func() {
pe.waitGroup.Done()
})
}
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
defer pe.doneExecution()
ok := pe.hasTasks(tasks)
if ok {

View File

@@ -106,6 +106,40 @@ func TestPeriodicalExecutor_Bulk(t *testing.T) {
lock.Unlock()
}
func TestPeriodicalExecutor_Wait(t *testing.T) {
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []interface{}) {
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(time.Second))
for i := 0; i < 10; i++ {
executer.Add(1)
}
executer.Flush()
executer.Wait()
}
func TestPeriodicalExecutor_WaitFast(t *testing.T) {
const total = 3
var cnt int
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []interface{}) {
defer func() {
cnt++
}()
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
for i := 0; i < total; i++ {
executer.Add(2)
}
executer.Flush()
executer.Wait()
assert.Equal(t, total, cnt)
}
// go test -benchtime 10s -bench .
func BenchmarkExecutor(b *testing.B) {
b.ReportAllocs()

View File

@@ -1,16 +1,8 @@
package lang
import "log"
var Placeholder PlaceholderType
type (
GenericType = interface{}
PlaceholderType = struct{}
)
func Must(err error) {
if err != nil {
log.Fatal(err)
}
}

View File

@@ -1,7 +0,0 @@
package lang
import "testing"
func TestMust(t *testing.T) {
Must(nil)
}

View File

@@ -17,7 +17,6 @@ import (
"sync/atomic"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/sysx"
"github.com/tal-tech/go-zero/core/timex"
)
@@ -46,6 +45,7 @@ const (
levelInfo = "info"
levelError = "error"
levelSevere = "severe"
levelFatal = "fatal"
levelSlow = "slow"
levelStat = "stat"
@@ -100,7 +100,7 @@ type (
)
func MustSetup(c LogConf) {
lang.Must(SetUp(c))
Must(SetUp(c))
}
// SetUp sets up the logx. If already set up, just return nil.
@@ -210,6 +210,14 @@ func Infof(format string, v ...interface{}) {
infoSync(fmt.Sprintf(format, v...))
}
func Must(err error) {
if err != nil {
msg := formatWithCaller(err.Error(), 3)
output(severeLog, levelFatal, msg)
os.Exit(1)
}
}
func SetLevel(level uint32) {
atomic.StoreUint32(&logLevel, level)
}

View File

@@ -131,6 +131,10 @@ func TestSetLevelWithDuration(t *testing.T) {
assert.Equal(t, 0, writer.builder.Len())
}
func TestMustNil(t *testing.T) {
Must(nil)
}
func BenchmarkCopyByteSliceAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
var buf []byte

View File

@@ -0,0 +1,44 @@
package queue
import (
"errors"
"sync/atomic"
"github.com/tal-tech/go-zero/core/logx"
)
var ErrNoAvailablePusher = errors.New("no available pusher")
type BalancedQueuePusher struct {
name string
pushers []Pusher
index uint64
}
func NewBalancedQueuePusher(pushers []Pusher) Pusher {
return &BalancedQueuePusher{
name: generateName(pushers),
pushers: pushers,
}
}
func (pusher *BalancedQueuePusher) Name() string {
return pusher.name
}
func (pusher *BalancedQueuePusher) Push(message string) error {
size := len(pusher.pushers)
for i := 0; i < size; i++ {
index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
target := pusher.pushers[index]
if err := target.Push(message); err != nil {
logx.Error(err)
} else {
return nil
}
}
return ErrNoAvailablePusher
}

View File

@@ -0,0 +1,43 @@
package queue
import (
"fmt"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
)
func TestBalancedQueuePusher(t *testing.T) {
const numPushers = 100
var pushers []Pusher
var mockedPushers []*mockedPusher
for i := 0; i < numPushers; i++ {
p := &mockedPusher{
name: "pusher:" + strconv.Itoa(i),
}
pushers = append(pushers, p)
mockedPushers = append(mockedPushers, p)
}
pusher := NewBalancedQueuePusher(pushers)
assert.True(t, len(pusher.Name()) > 0)
for i := 0; i < numPushers*1000; i++ {
assert.Nil(t, pusher.Push("item"))
}
var counts []int
for _, p := range mockedPushers {
counts = append(counts, p.count)
}
mean := calcMean(counts)
variance := calcVariance(mean, counts)
assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
}
func TestBalancedQueuePusher_NoAvailable(t *testing.T) {
pusher := NewBalancedQueuePusher(nil)
assert.True(t, len(pusher.Name()) == 0)
assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item"))
}

10
core/queue/consumer.go Normal file
View File

@@ -0,0 +1,10 @@
package queue
type (
Consumer interface {
Consume(string) error
OnEvent(event interface{})
}
ConsumerFactory func() (Consumer, error)
)

View File

@@ -0,0 +1,6 @@
package queue
type MessageQueue interface {
Start()
Stop()
}

View File

@@ -0,0 +1,31 @@
package queue
import "github.com/tal-tech/go-zero/core/errorx"
type MultiQueuePusher struct {
name string
pushers []Pusher
}
func NewMultiQueuePusher(pushers []Pusher) Pusher {
return &MultiQueuePusher{
name: generateName(pushers),
pushers: pushers,
}
}
func (pusher *MultiQueuePusher) Name() string {
return pusher.name
}
func (pusher *MultiQueuePusher) Push(message string) error {
var batchError errorx.BatchError
for _, each := range pusher.pushers {
if err := each.Push(message); err != nil {
batchError.Add(err)
}
}
return batchError.Err()
}

View File

@@ -0,0 +1,39 @@
package queue
import (
"fmt"
"math"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMultiQueuePusher(t *testing.T) {
const numPushers = 100
var pushers []Pusher
var mockedPushers []*mockedPusher
for i := 0; i < numPushers; i++ {
p := &mockedPusher{
name: "pusher:" + strconv.Itoa(i),
}
pushers = append(pushers, p)
mockedPushers = append(mockedPushers, p)
}
pusher := NewMultiQueuePusher(pushers)
assert.True(t, len(pusher.Name()) > 0)
for i := 0; i < 1000; i++ {
_ = pusher.Push("item")
}
var counts []int
for _, p := range mockedPushers {
counts = append(counts, p.count)
}
mean := calcMean(counts)
variance := calcVariance(mean, counts)
assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10)
assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
}

15
core/queue/producer.go Normal file
View File

@@ -0,0 +1,15 @@
package queue
type (
Producer interface {
AddListener(listener ProduceListener)
Produce() (string, bool)
}
ProduceListener interface {
OnProducerPause()
OnProducerResume()
}
ProducerFactory func() (Producer, error)
)

239
core/queue/queue.go Normal file
View File

@@ -0,0 +1,239 @@
package queue
import (
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/rescue"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex"
)
const queueName = "queue"
type (
Queue struct {
name string
metrics *stat.Metrics
producerFactory ProducerFactory
producerRoutineGroup *threading.RoutineGroup
consumerFactory ConsumerFactory
consumerRoutineGroup *threading.RoutineGroup
producerCount int
consumerCount int
active int32
channel chan string
quit chan struct{}
listeners []Listener
eventLock sync.Mutex
eventChannels []chan interface{}
}
Listener interface {
OnPause()
OnResume()
}
Poller interface {
Name() string
Poll() string
}
Pusher interface {
Name() string
Push(string) error
}
)
func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
queue := &Queue{
metrics: stat.NewMetrics(queueName),
producerFactory: producerFactory,
producerRoutineGroup: threading.NewRoutineGroup(),
consumerFactory: consumerFactory,
consumerRoutineGroup: threading.NewRoutineGroup(),
producerCount: runtime.NumCPU(),
consumerCount: runtime.NumCPU() << 1,
channel: make(chan string),
quit: make(chan struct{}),
}
queue.SetName(queueName)
return queue
}
func (queue *Queue) AddListener(listener Listener) {
queue.listeners = append(queue.listeners, listener)
}
func (queue *Queue) Broadcast(message interface{}) {
go func() {
queue.eventLock.Lock()
defer queue.eventLock.Unlock()
for _, channel := range queue.eventChannels {
channel <- message
}
}()
}
func (queue *Queue) SetName(name string) {
queue.name = name
queue.metrics.SetName(name)
}
func (queue *Queue) SetNumConsumer(count int) {
queue.consumerCount = count
}
func (queue *Queue) SetNumProducer(count int) {
queue.producerCount = count
}
func (queue *Queue) Start() {
queue.startProducers(queue.producerCount)
queue.startConsumers(queue.consumerCount)
queue.producerRoutineGroup.Wait()
close(queue.channel)
queue.consumerRoutineGroup.Wait()
}
func (queue *Queue) Stop() {
close(queue.quit)
}
func (queue *Queue) consume(eventChan chan interface{}) {
var consumer Consumer
for {
var err error
if consumer, err = queue.consumerFactory(); err != nil {
logx.Errorf("Error on creating consumer: %v", err)
time.Sleep(time.Second)
} else {
break
}
}
for {
select {
case message, ok := <-queue.channel:
if ok {
queue.consumeOne(consumer, message)
} else {
logx.Info("Task channel was closed, quitting consumer...")
return
}
case event := <-eventChan:
consumer.OnEvent(event)
}
}
}
func (queue *Queue) consumeOne(consumer Consumer, message string) {
threading.RunSafe(func() {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
queue.metrics.Add(stat.Task{
Duration: duration,
})
logx.WithDuration(duration).Infof("%s", message)
}()
if err := consumer.Consume(message); err != nil {
logx.Errorf("Error occurred while consuming %v: %v", message, err)
}
})
}
func (queue *Queue) pause() {
for _, listener := range queue.listeners {
listener.OnPause()
}
}
func (queue *Queue) produce() {
var producer Producer
for {
var err error
if producer, err = queue.producerFactory(); err != nil {
logx.Errorf("Error on creating producer: %v", err)
time.Sleep(time.Second)
} else {
break
}
}
atomic.AddInt32(&queue.active, 1)
producer.AddListener(routineListener{
queue: queue,
})
for {
select {
case <-queue.quit:
logx.Info("Quitting producer")
return
default:
if v, ok := queue.produceOne(producer); ok {
queue.channel <- v
}
}
}
}
func (queue *Queue) produceOne(producer Producer) (string, bool) {
// avoid panic quit the producer, just log it and continue
defer rescue.Recover()
return producer.Produce()
}
func (queue *Queue) resume() {
for _, listener := range queue.listeners {
listener.OnResume()
}
}
func (queue *Queue) startConsumers(number int) {
for i := 0; i < number; i++ {
eventChan := make(chan interface{})
queue.eventLock.Lock()
queue.eventChannels = append(queue.eventChannels, eventChan)
queue.eventLock.Unlock()
queue.consumerRoutineGroup.Run(func() {
queue.consume(eventChan)
})
}
}
func (queue *Queue) startProducers(number int) {
for i := 0; i < number; i++ {
queue.producerRoutineGroup.Run(func() {
queue.produce()
})
}
}
type routineListener struct {
queue *Queue
}
func (rl routineListener) OnProducerPause() {
if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
rl.queue.pause()
}
}
func (rl routineListener) OnProducerResume() {
if atomic.AddInt32(&rl.queue.active, 1) == 1 {
rl.queue.resume()
}
}

94
core/queue/queue_test.go Normal file
View File

@@ -0,0 +1,94 @@
package queue
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const (
consumers = 4
rounds = 100
)
func TestQueue(t *testing.T) {
producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
consumer.wait.Add(consumers)
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
q.AddListener(new(mockedListener))
q.SetName("mockqueue")
q.SetNumConsumer(consumers)
q.SetNumProducer(1)
q.pause()
q.resume()
go func() {
producer.wait.Wait()
q.Stop()
}()
q.Start()
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
type mockedConsumer struct {
count int32
events int32
wait sync.WaitGroup
}
func newMockedConsumer() *mockedConsumer {
return new(mockedConsumer)
}
func (c *mockedConsumer) Consume(string) error {
atomic.AddInt32(&c.count, 1)
return nil
}
func (c *mockedConsumer) OnEvent(interface{}) {
if atomic.AddInt32(&c.events, 1) <= consumers {
c.wait.Done()
}
}
type mockedProducer struct {
total int32
count int32
wait sync.WaitGroup
}
func newMockedProducer(total int32) *mockedProducer {
p := new(mockedProducer)
p.total = total
p.wait.Add(int(total))
return p
}
func (p *mockedProducer) AddListener(listener ProduceListener) {
}
func (p *mockedProducer) Produce() (string, bool) {
if atomic.AddInt32(&p.count, 1) <= p.total {
p.wait.Done()
return "item", true
} else {
time.Sleep(time.Second)
return "", false
}
}
type mockedListener struct {
}
func (l *mockedListener) OnPause() {
}
func (l *mockedListener) OnResume() {
}

12
core/queue/util.go Normal file
View File

@@ -0,0 +1,12 @@
package queue
import "strings"
func generateName(pushers []Pusher) string {
names := make([]string, len(pushers))
for i, pusher := range pushers {
names[i] = pusher.Name()
}
return strings.Join(names, ",")
}

77
core/queue/util_test.go Normal file
View File

@@ -0,0 +1,77 @@
package queue
import (
"errors"
"math"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx"
)
var (
proba = mathx.NewProba()
failProba = 0.01
)
func init() {
logx.Disable()
}
func TestGenerateName(t *testing.T) {
pushers := []Pusher{
&mockedPusher{name: "first"},
&mockedPusher{name: "second"},
&mockedPusher{name: "third"},
}
assert.Equal(t, "first,second,third", generateName(pushers))
}
func TestGenerateNameNil(t *testing.T) {
var pushers []Pusher
assert.Equal(t, "", generateName(pushers))
}
func calcMean(vals []int) float64 {
if len(vals) == 0 {
return 0
}
var result float64
for _, val := range vals {
result += float64(val)
}
return result / float64(len(vals))
}
func calcVariance(mean float64, vals []int) float64 {
if len(vals) == 0 {
return 0
}
var result float64
for _, val := range vals {
result += math.Pow(float64(val)-mean, 2)
}
return result / float64(len(vals))
}
type mockedPusher struct {
name string
count int
}
func (p *mockedPusher) Name() string {
return p.name
}
func (p *mockedPusher) Push(s string) error {
if proba.TrueOnProba(failProba) {
return errors.New("dummy")
}
p.count++
return nil
}

View File

@@ -7,7 +7,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
const (
@@ -24,17 +24,17 @@ var (
func init() {
cpus, err := perCpuUsage()
lang.Must(err)
logx.Must(err)
cores = uint64(len(cpus))
sets, err := cpuSets()
lang.Must(err)
logx.Must(err)
quota = float64(len(sets))
cq, err := cpuQuota()
if err == nil {
if cq != -1 {
period, err := cpuPeriod()
lang.Must(err)
logx.Must(err)
limit := float64(cq) / float64(period)
if limit < quota {
@@ -44,10 +44,10 @@ func init() {
}
preSystem, err = systemCpuUsage()
lang.Must(err)
logx.Must(err)
preTotal, err = totalCpuUsage()
lang.Must(err)
logx.Must(err)
}
func RefreshCpu() uint64 {

View File

@@ -5,7 +5,6 @@ import (
"time"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/stat"
@@ -33,7 +32,7 @@ type delayTask struct {
func init() {
var err error
timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
lang.Must(err)
logx.Must(err)
proc.AddShutdownListener(func() {
timingWheel.Drain(clean)

View File

@@ -3,7 +3,7 @@ package sysx
import (
"os"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx"
)
var hostname string
@@ -11,7 +11,9 @@ var hostname string
func init() {
var err error
hostname, err = os.Hostname()
lang.Must(err)
if err != nil {
hostname = stringx.RandId()
}
}
func Hostname() string {

Binary file not shown.

View File

@@ -10,6 +10,7 @@ import (
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -99,7 +100,7 @@ func main() {
gb := breaker.NewBreaker()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,state,googleCalls,netflixCalls")

View File

@@ -5,12 +5,12 @@ import (
"time"
"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
func main() {
sub, err := discov.NewSubscriber([]string{"etcd.discovery:2379"}, "028F2C35852D", discov.Exclusive())
lang.Must(err)
logx.Must(err)
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -119,14 +120,14 @@ func main() {
flag.Parse()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,goodOk,goodFail,goodReject,goodErrs,goodUnknowns,goodDropRatio,"+
"heavyOk,heavyFail,heavyReject,heavyErrs,heavyUnknowns,heavyDropRatio")
var gm, hm metric
dur, err := time.ParseDuration(*duration)
lang.Must(err)
logx.Must(err)
done := make(chan lang.PlaceholderType)
group := threading.NewRoutineGroup()
group.RunSafe(func() {

View File

@@ -13,7 +13,7 @@ import (
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/executors"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -47,7 +47,7 @@ func main() {
lessWriter = executors.NewLessExecutor(interval * total / 100)
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "second,maxFlight,flying,agressiveAvgFlying,lazyAvgFlying,bothAvgFlying")

View File

@@ -11,7 +11,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/fx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
var (
@@ -27,7 +27,7 @@ func main() {
flag.Parse()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,total,pass,fail,drop")

View File

@@ -144,27 +144,26 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架有如下
8 directories, 9 files
```
生成的代码可以直接运行:
```shell
```shell
cd greet
go run greet.go -f etc/greet-api.json
```
默认侦听在8888端口可以在配置文件里修改可以通过curl请求
```shell
默认侦听在8888端口可以在配置文件里修改可以通过curl请求
```shell
➜ go-zero git:(master) curl -w "\ncode: %{http_code}\n" http://localhost:8888/greet/from/kevin
{"code":0}
code: 200
```
编写业务代码:
* 可以在servicecontext.go里面传递依赖给logic比如mysql, redis等
编写业务代码:
* 可以在servicecontext.go里面传递依赖给logic比如mysql, redis等
* 在api定义的get/post/put/delete等请求对应的logic里增加业务处理逻辑
4. 可以根据api文件生成前端需要的Java, TypeScript, Dart, JavaScript代码
```shell

View File

@@ -8,7 +8,7 @@ import (
"github.com/dgrijalva/jwt-go"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/rest/internal"
"github.com/tal-tech/go-zero/rest/token"
)
const (
@@ -43,7 +43,7 @@ func Authorize(secret string, opts ...AuthorizeOption) func(http.Handler) http.H
opt(&authOpts)
}
parser := internal.NewTokenParser()
parser := token.NewTokenParser()
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token, err := parser.ParseToken(r, secret, authOpts.PrevSecret)

View File

@@ -8,7 +8,7 @@ import (
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/rest/internal"
"github.com/tal-tech/go-zero/rest/httpx"
"github.com/tal-tech/go-zero/rest/internal/security"
)
@@ -22,7 +22,7 @@ func BreakerHandler(method, path string, metrics *stat.Metrics) func(http.Handle
if err != nil {
metrics.AddDrop()
logx.Errorf("[http] dropped, %s - %s - %s",
r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent())
r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
w.WriteHeader(http.StatusServiceUnavailable)
return
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/utils"
"github.com/tal-tech/go-zero/rest/httpx"
"github.com/tal-tech/go-zero/rest/internal"
)
@@ -112,10 +113,10 @@ func logBrief(r *http.Request, code int, timer *utils.ElapsedTimer, logs *intern
var buf bytes.Buffer
duration := timer.Duration()
buf.WriteString(fmt.Sprintf("%d - %s - %s - %s - %s",
code, r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration)))
code, r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration)))
if duration > slowThreshold {
logx.Slowf("[HTTP] %d - %s - %s - %s - slowcall(%s)",
code, r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration))
code, r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration))
}
ok := isOkResponse(code)

View File

@@ -7,7 +7,7 @@ import (
"github.com/tal-tech/go-zero/core/load"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/rest/internal"
"github.com/tal-tech/go-zero/rest/httpx"
"github.com/tal-tech/go-zero/rest/internal/security"
)
@@ -35,7 +35,7 @@ func SheddingHandler(shedder load.Shedder, metrics *stat.Metrics) func(http.Hand
metrics.AddDrop()
sheddingStat.IncrementDrop()
logx.Errorf("[http] dropped, %s - %s - %s",
r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent())
r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
w.WriteHeader(http.StatusServiceUnavailable)
return
}

View File

@@ -84,7 +84,6 @@ func ParseHeader(headerValue string) map[string]string {
// Parses the post request which contains json in body.
func ParseJsonBody(r *http.Request, v interface{}) error {
var reader io.Reader
if withJsonBody(r) {
reader = io.LimitReader(r.Body, maxBodyLen)
} else {

View File

@@ -1,4 +1,4 @@
package internal
package httpx
import "net/http"

View File

@@ -1,4 +1,4 @@
package internal
package httpx
import (
"net/http"
@@ -16,4 +16,3 @@ func TestGetRemoteAddr(t *testing.T) {
r.Header.Set(xForwardFor, host)
assert.Equal(t, host, GetRemoteAddr(r))
}

View File

@@ -7,6 +7,7 @@ import (
"sync"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/rest/httpx"
)
const LogContext = "request_logs"
@@ -79,5 +80,5 @@ func formatf(r *http.Request, format string, v ...interface{}) string {
}
func formatWithReq(r *http.Request, v string) string {
return fmt.Sprintf("(%s - %s) %s", r.RequestURI, GetRemoteAddr(r), v)
return fmt.Sprintf("(%s - %s) %s", r.RequestURI, httpx.GetRemoteAddr(r), v)
}

View File

@@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/rest/httpx"
"github.com/tal-tech/go-zero/rest/internal/router"
"github.com/tal-tech/go-zero/rest/router"
)
func TestWithMiddleware(t *testing.T) {

View File

@@ -13,7 +13,7 @@ import (
"github.com/tal-tech/go-zero/rest/handler"
"github.com/tal-tech/go-zero/rest/httpx"
"github.com/tal-tech/go-zero/rest/internal"
"github.com/tal-tech/go-zero/rest/internal/router"
"github.com/tal-tech/go-zero/rest/router"
)
// use 1000m to represent 100%

View File

@@ -1,4 +1,4 @@
package internal
package token
import (
"net/http"

View File

@@ -1,4 +1,4 @@
package internal
package token
import (
"net/http"

View File

@@ -4,7 +4,7 @@ import (
"errors"
"strings"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/urfave/cli"
)
@@ -32,8 +32,8 @@ func DartCommand(c *cli.Context) error {
dir = dir + "/"
}
api.Info.Title = strings.Replace(apiFile, ".api", "", -1)
lang.Must(genData(dir+"data/", api))
lang.Must(genApi(dir+"api/", api))
lang.Must(genVars(dir + "vars/"))
logx.Must(genData(dir+"data/", api))
logx.Must(genApi(dir+"api/", api))
logx.Must(genVars(dir + "vars/"))
return nil
}

View File

@@ -1,6 +1,7 @@
package gogen
import (
"bytes"
"errors"
"fmt"
"os"
@@ -13,7 +14,7 @@ import (
"time"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
apiformat "github.com/tal-tech/go-zero/tools/goctl/api/format"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
apiutil "github.com/tal-tech/go-zero/tools/goctl/api/util"
@@ -44,17 +45,18 @@ func GoCommand(c *cli.Context) error {
return err
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genEtc(dir, api))
lang.Must(genConfig(dir))
lang.Must(genMain(dir, api))
lang.Must(genServiceContext(dir, api))
lang.Must(genTypes(dir, api))
lang.Must(genHandlers(dir, api))
lang.Must(genRoutes(dir, api))
lang.Must(genLogic(dir, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genEtc(dir, api))
logx.Must(genConfig(dir))
logx.Must(genMain(dir, api))
logx.Must(genServiceContext(dir, api))
logx.Must(genTypes(dir, api))
logx.Must(genHandlers(dir, api))
logx.Must(genRoutes(dir, api))
logx.Must(genLogic(dir, api))
// it does not work
format(dir)
createGoModFileIfNeed(dir)
if err := backupAndSweep(apiFile); err != nil {
return err
@@ -98,7 +100,7 @@ func format(dir string) {
cmd := exec.Command("go", "fmt", "./"+dir+"...")
_, err := cmd.CombinedOutput()
if err != nil {
print(err.Error())
fmt.Println(err.Error())
}
}
@@ -131,3 +133,43 @@ func sweep() error {
return nil
})
}
func createGoModFileIfNeed(dir string) {
absDir, err := filepath.Abs(dir)
if err != nil {
panic(err)
}
var tempPath = absDir
var hasGoMod = false
for {
if tempPath == filepath.Dir(tempPath) {
break
}
tempPath = filepath.Dir(tempPath)
if util.FileExists(filepath.Join(tempPath, goModeIdentifier)) {
tempPath = filepath.Dir(tempPath)
hasGoMod = true
break
}
}
if !hasGoMod {
gopath := os.Getenv("GOPATH")
parent := path.Join(gopath, "src")
pos := strings.Index(absDir, parent)
if pos < 0 {
moduleName := absDir[len(filepath.Dir(absDir))+1:]
cmd := exec.Command("go", "mod", "init", moduleName)
cmd.Dir = dir
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
fmt.Println(err.Error())
}
outStr, errStr := string(stdout.Bytes()), string(stderr.Bytes())
fmt.Printf(outStr + "\n" + errStr)
}
}
}

View File

@@ -28,6 +28,9 @@ func getParentPackage(dir string) (string, error) {
var tempPath = absDir
var hasGoMod = false
for {
if tempPath == filepath.Dir(tempPath) {
break
}
tempPath = filepath.Dir(tempPath)
if goctlutil.FileExists(filepath.Join(tempPath, goModeIdentifier)) {
tempPath = filepath.Dir(tempPath)

View File

@@ -6,7 +6,7 @@ import (
"strings"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/tal-tech/go-zero/tools/goctl/util"
"github.com/urfave/cli"
@@ -36,9 +36,9 @@ func JavaCommand(c *cli.Context) error {
packetName = packetName[:len(packetName)-4]
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genPacket(dir, packetName, api))
lang.Must(genComponents(dir, packetName, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genPacket(dir, packetName, api))
logx.Must(genComponents(dir, packetName, api))
fmt.Println(aurora.Green("Done."))
return nil

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"os"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
)
@@ -14,8 +14,8 @@ func main() {
}
p, err := parser.NewParser(os.Args[1])
lang.Must(err)
logx.Must(err)
api, err := p.Parse()
lang.Must(err)
logx.Must(err)
fmt.Println(api)
}

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/tal-tech/go-zero/tools/goctl/util"
"github.com/urfave/cli"
@@ -34,9 +34,9 @@ func TsCommand(c *cli.Context) error {
return err
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genHandler(dir, webApi, caller, api, unwrapApi))
lang.Must(genComponents(dir, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genHandler(dir, webApi, caller, api, unwrapApi))
logx.Must(genComponents(dir, api))
fmt.Println(aurora.Green("Done."))
return nil

View File

@@ -8,13 +8,13 @@ import (
"path"
"strings"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/spec"
"github.com/tal-tech/go-zero/tools/goctl/util"
)
func MaybeCreateFile(dir, subdir, file string) (fp *os.File, created bool, err error) {
lang.Must(util.MkdirIfNotExist(path.Join(dir, subdir)))
logx.Must(util.MkdirIfNotExist(path.Join(dir, subdir)))
fpath := path.Join(dir, subdir, file)
if util.FileExists(fpath) {
fmt.Printf("%s exists, ignored generation\n", fpath)

View File

@@ -8,7 +8,6 @@ import (
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/update/config"
"github.com/tal-tech/go-zero/tools/goctl/util"
@@ -56,5 +55,5 @@ func main() {
fs := http.FileServer(http.Dir(c.FileDir))
http.Handle(c.FilePath, http.StripPrefix(c.FilePath, forChksumHandler(path.Join(c.FileDir, filename), fs)))
lang.Must(http.ListenAndServe(c.ListenOn, nil))
logx.Must(http.ListenAndServe(c.ListenOn, nil))
}

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"strings"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/urfave/cli"
)
@@ -14,7 +14,7 @@ func FileModelCommand(c *cli.Context) error {
if len(configFile) == 0 {
return errors.New("missing config value")
}
lang.Must(genModelWithConfigFile(configFile))
logx.Must(genModelWithConfigFile(configFile))
return nil
}
@@ -36,6 +36,6 @@ func CmdModelCommand(c *cli.Context) error {
user := addressArr[0]
host := addressArr[1]
address = fmt.Sprintf("%v@tcp(%v)/information_schema", user, host)
lang.Must(genModelWithDataSource(address, schema, force, redis, nil))
logx.Must(genModelWithDataSource(address, schema, force, redis, nil))
return nil
}