Compare commits
11 Commits
tools/goct
...
tools/goct
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f46eab977 | ||
|
|
ec299085f5 | ||
|
|
7727d70634 | ||
|
|
5f9d101bc6 | ||
|
|
6c2abe7474 | ||
|
|
14a902c1a7 | ||
|
|
5ad6a6d229 | ||
|
|
6f4b97864a | ||
|
|
0e0abc3a95 | ||
|
|
696fda1db4 | ||
|
|
c1d2634427 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -16,7 +16,8 @@
|
||||
**/logs
|
||||
|
||||
# for test purpose
|
||||
adhoc
|
||||
**/adhoc
|
||||
**/testdata
|
||||
|
||||
# gitlab ci
|
||||
.cache
|
||||
|
||||
@@ -3,12 +3,11 @@ package mr
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/errorx"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -42,6 +41,16 @@ type (
|
||||
// Option defines the method to customize the mapreduce.
|
||||
Option func(opts *mapReduceOptions)
|
||||
|
||||
mapperContext struct {
|
||||
ctx context.Context
|
||||
mapper MapFunc
|
||||
source <-chan interface{}
|
||||
panicChan *onceChan
|
||||
collector chan<- interface{}
|
||||
doneChan <-chan lang.PlaceholderType
|
||||
workers int
|
||||
}
|
||||
|
||||
mapReduceOptions struct {
|
||||
ctx context.Context
|
||||
workers int
|
||||
@@ -90,46 +99,72 @@ func FinishVoid(fns ...func()) {
|
||||
|
||||
// ForEach maps all elements from given generate but no output.
|
||||
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
||||
drain(Map(generate, func(item interface{}, writer Writer) {
|
||||
mapper(item)
|
||||
}, opts...))
|
||||
}
|
||||
|
||||
// Map maps all elements generated from given generate func, and returns an output channel.
|
||||
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
|
||||
options := buildOptions(opts...)
|
||||
source := buildSource(generate)
|
||||
panicChan := &onceChan{channel: make(chan interface{})}
|
||||
source := buildSource(generate, panicChan)
|
||||
collector := make(chan interface{}, options.workers)
|
||||
done := make(chan lang.PlaceholderType)
|
||||
|
||||
go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
|
||||
go executeMappers(mapperContext{
|
||||
ctx: options.ctx,
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
mapper(item)
|
||||
},
|
||||
source: source,
|
||||
panicChan: panicChan,
|
||||
collector: collector,
|
||||
doneChan: done,
|
||||
workers: options.workers,
|
||||
})
|
||||
|
||||
return collector
|
||||
for {
|
||||
select {
|
||||
case v := <-panicChan.channel:
|
||||
panic(v)
|
||||
case _, ok := <-collector:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MapReduce maps all elements generated from given generate func,
|
||||
// and reduces the output elements with given reducer.
|
||||
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
||||
opts ...Option) (interface{}, error) {
|
||||
source := buildSource(generate)
|
||||
return MapReduceChan(source, mapper, reducer, opts...)
|
||||
panicChan := &onceChan{channel: make(chan interface{})}
|
||||
source := buildSource(generate, panicChan)
|
||||
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||
}
|
||||
|
||||
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
||||
opts ...Option) (interface{}, error) {
|
||||
panicChan := &onceChan{channel: make(chan interface{})}
|
||||
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||
}
|
||||
|
||||
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
|
||||
reducer ReducerFunc, opts ...Option) (interface{}, error) {
|
||||
options := buildOptions(opts...)
|
||||
// output is used to write the final result
|
||||
output := make(chan interface{})
|
||||
defer func() {
|
||||
// reducer can only write once, if more, panic
|
||||
for range output {
|
||||
panic("more than one element written in reducer")
|
||||
}
|
||||
}()
|
||||
|
||||
// collector is used to collect data from mapper, and consume in reducer
|
||||
collector := make(chan interface{}, options.workers)
|
||||
// if done is closed, all mappers and reducer should stop processing
|
||||
done := make(chan lang.PlaceholderType)
|
||||
writer := newGuardedWriter(options.ctx, output, done)
|
||||
var closeOnce sync.Once
|
||||
// use atomic.Value to avoid data race
|
||||
var retErr errorx.AtomicError
|
||||
finish := func() {
|
||||
closeOnce.Do(func() {
|
||||
@@ -151,30 +186,38 @@ func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer Reducer
|
||||
go func() {
|
||||
defer func() {
|
||||
drain(collector)
|
||||
|
||||
if r := recover(); r != nil {
|
||||
cancel(fmt.Errorf("%v", r))
|
||||
} else {
|
||||
finish()
|
||||
panicChan.write(r)
|
||||
}
|
||||
finish()
|
||||
}()
|
||||
|
||||
reducer(collector, writer, cancel)
|
||||
}()
|
||||
|
||||
go executeMappers(options.ctx, func(item interface{}, w Writer) {
|
||||
mapper(item, w, cancel)
|
||||
}, source, collector, done, options.workers)
|
||||
go executeMappers(mapperContext{
|
||||
ctx: options.ctx,
|
||||
mapper: func(item interface{}, w Writer) {
|
||||
mapper(item, w, cancel)
|
||||
},
|
||||
source: source,
|
||||
panicChan: panicChan,
|
||||
collector: collector,
|
||||
doneChan: done,
|
||||
workers: options.workers,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-options.ctx.Done():
|
||||
cancel(context.DeadlineExceeded)
|
||||
return nil, context.DeadlineExceeded
|
||||
case value, ok := <-output:
|
||||
case v := <-panicChan.channel:
|
||||
panic(v)
|
||||
case v, ok := <-output:
|
||||
if err := retErr.Load(); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
return value, nil
|
||||
return v, nil
|
||||
} else {
|
||||
return nil, ErrReduceNoOutput
|
||||
}
|
||||
@@ -221,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
|
||||
return options
|
||||
}
|
||||
|
||||
func buildSource(generate GenerateFunc) chan interface{} {
|
||||
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
|
||||
source := make(chan interface{})
|
||||
threading.GoSafe(func() {
|
||||
defer close(source)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
panicChan.write(r)
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
|
||||
generate(source)
|
||||
})
|
||||
}()
|
||||
|
||||
return source
|
||||
}
|
||||
@@ -238,39 +287,43 @@ func drain(channel <-chan interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
|
||||
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
|
||||
func executeMappers(mCtx mapperContext) {
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
wg.Wait()
|
||||
close(collector)
|
||||
close(mCtx.collector)
|
||||
drain(mCtx.source)
|
||||
}()
|
||||
|
||||
pool := make(chan lang.PlaceholderType, workers)
|
||||
writer := newGuardedWriter(ctx, collector, done)
|
||||
for {
|
||||
var failed int32
|
||||
pool := make(chan lang.PlaceholderType, mCtx.workers)
|
||||
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
|
||||
for atomic.LoadInt32(&failed) == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-mCtx.ctx.Done():
|
||||
return
|
||||
case <-done:
|
||||
case <-mCtx.doneChan:
|
||||
return
|
||||
case pool <- lang.Placeholder:
|
||||
item, ok := <-input
|
||||
item, ok := <-mCtx.source
|
||||
if !ok {
|
||||
<-pool
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
// better to safely run caller defined method
|
||||
threading.GoSafe(func() {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
atomic.AddInt32(&failed, 1)
|
||||
mCtx.panicChan.write(r)
|
||||
}
|
||||
wg.Done()
|
||||
<-pool
|
||||
}()
|
||||
|
||||
mapper(item, writer)
|
||||
})
|
||||
mCtx.mapper(item, writer)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -316,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
|
||||
gw.channel <- v
|
||||
}
|
||||
}
|
||||
|
||||
type onceChan struct {
|
||||
channel chan interface{}
|
||||
wrote int32
|
||||
}
|
||||
|
||||
func (oc *onceChan) write(val interface{}) {
|
||||
if atomic.AddInt32(&oc.wrote, 1) > 1 {
|
||||
return
|
||||
}
|
||||
|
||||
oc.channel <- val
|
||||
}
|
||||
|
||||
78
core/mr/mapreduce_fuzz_test.go
Normal file
78
core/mr/mapreduce_fuzz_test.go
Normal file
@@ -0,0 +1,78 @@
|
||||
//go:build go1.18
|
||||
// +build go1.18
|
||||
|
||||
package mr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func FuzzMapReduce(f *testing.F) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
f.Add(uint(10), uint(runtime.NumCPU()))
|
||||
f.Fuzz(func(t *testing.T, num uint, workers uint) {
|
||||
n := int64(num)%5000 + 5000
|
||||
genPanic := rand.Intn(100) == 0
|
||||
mapperPanic := rand.Intn(100) == 0
|
||||
reducerPanic := rand.Intn(100) == 0
|
||||
genIdx := rand.Int63n(n)
|
||||
mapperIdx := rand.Int63n(n)
|
||||
reducerIdx := rand.Int63n(n)
|
||||
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||
|
||||
fn := func() (interface{}, error) {
|
||||
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||
|
||||
return MapReduce(func(source chan<- interface{}) {
|
||||
for i := int64(0); i < n; i++ {
|
||||
source <- i
|
||||
if genPanic && i == genIdx {
|
||||
panic("foo")
|
||||
}
|
||||
}
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int64)
|
||||
if mapperPanic && v == mapperIdx {
|
||||
panic("bar")
|
||||
}
|
||||
writer.Write(v * v)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var idx int64
|
||||
var total int64
|
||||
for v := range pipe {
|
||||
if reducerPanic && idx == reducerIdx {
|
||||
panic("baz")
|
||||
}
|
||||
total += v.(int64)
|
||||
idx++
|
||||
}
|
||||
writer.Write(total)
|
||||
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||
}
|
||||
|
||||
if genPanic || mapperPanic || reducerPanic {
|
||||
var buf strings.Builder
|
||||
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||
} else {
|
||||
val, err := fn()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, squareSum, val.(int64))
|
||||
}
|
||||
})
|
||||
}
|
||||
107
core/mr/mapreduce_rand_test.go
Normal file
107
core/mr/mapreduce_rand_test.go
Normal file
@@ -0,0 +1,107 @@
|
||||
//go:build fuzz
|
||||
// +build fuzz
|
||||
|
||||
package mr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
"gopkg.in/cheggaaa/pb.v1"
|
||||
)
|
||||
|
||||
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
|
||||
// so we need to simulate the fuzz test in test mode.
|
||||
func TestMapReduceRandom(t *testing.T) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
const (
|
||||
times = 10000
|
||||
nRange = 500
|
||||
mega = 1024 * 1024
|
||||
)
|
||||
|
||||
bar := pb.New(times).Start()
|
||||
runner := threading.NewTaskRunner(runtime.NumCPU())
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(times)
|
||||
for i := 0; i < times; i++ {
|
||||
runner.Schedule(func() {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
if time.Since(start) > time.Minute {
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||
n := rand.Int63n(nRange)%nRange + nRange
|
||||
workers := rand.Int()%50 + runtime.NumCPU()/2
|
||||
genPanic := rand.Intn(100) == 0
|
||||
mapperPanic := rand.Intn(100) == 0
|
||||
reducerPanic := rand.Intn(100) == 0
|
||||
genIdx := rand.Int63n(n)
|
||||
mapperIdx := rand.Int63n(n)
|
||||
reducerIdx := rand.Int63n(n)
|
||||
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||
|
||||
fn := func() (interface{}, error) {
|
||||
return MapReduce(func(source chan<- interface{}) {
|
||||
for i := int64(0); i < n; i++ {
|
||||
source <- i
|
||||
if genPanic && i == genIdx {
|
||||
panic("foo")
|
||||
}
|
||||
}
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int64)
|
||||
if mapperPanic && v == mapperIdx {
|
||||
panic("bar")
|
||||
}
|
||||
writer.Write(v * v)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var idx int64
|
||||
var total int64
|
||||
for v := range pipe {
|
||||
if reducerPanic && idx == reducerIdx {
|
||||
panic("baz")
|
||||
}
|
||||
total += v.(int64)
|
||||
idx++
|
||||
}
|
||||
writer.Write(total)
|
||||
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||
}
|
||||
|
||||
if genPanic || mapperPanic || reducerPanic {
|
||||
var buf strings.Builder
|
||||
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||
} else {
|
||||
val, err := fn()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, squareSum, val.(int64))
|
||||
}
|
||||
bar.Increment()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
bar.Finish()
|
||||
}
|
||||
@@ -11,8 +11,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
@@ -124,84 +122,69 @@ func TestForEach(t *testing.T) {
|
||||
t.Run("all", func(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}) {
|
||||
panic("foo")
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}) {
|
||||
panic("foo")
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMap(t *testing.T) {
|
||||
func TestGeneratePanic(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
tests := []struct {
|
||||
mapper MapFunc
|
||||
expect int
|
||||
}{
|
||||
{
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
},
|
||||
expect: 30,
|
||||
},
|
||||
{
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
v := item.(int)
|
||||
if v%2 == 0 {
|
||||
return
|
||||
}
|
||||
writer.Write(v * v)
|
||||
},
|
||||
expect: 10,
|
||||
},
|
||||
{
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
v := item.(int)
|
||||
if v%2 == 0 {
|
||||
panic(v)
|
||||
}
|
||||
writer.Write(v * v)
|
||||
},
|
||||
expect: 10,
|
||||
},
|
||||
}
|
||||
t.Run("all", func(t *testing.T) {
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
panic("foo")
|
||||
}, func(item interface{}) {
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
||||
channel := Map(func(source chan<- interface{}) {
|
||||
for i := 1; i < 5; i++ {
|
||||
func TestMapperPanic(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
const tasks = 1000
|
||||
var run int32
|
||||
t.Run("all", func(t *testing.T) {
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, test.mapper, WithWorkers(-1))
|
||||
|
||||
var result int
|
||||
for v := range channel {
|
||||
result += v.(int)
|
||||
}
|
||||
|
||||
assert.Equal(t, test.expect, result)
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
atomic.AddInt32(&run, 1)
|
||||
panic("foo")
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
})
|
||||
})
|
||||
}
|
||||
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapReduce(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mapper MapperFunc
|
||||
reducer ReducerFunc
|
||||
expectErr error
|
||||
expectValue interface{}
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
expectErr: nil,
|
||||
expectValue: 30,
|
||||
},
|
||||
{
|
||||
name: "cancel with error",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -212,6 +195,7 @@ func TestMapReduce(t *testing.T) {
|
||||
expectErr: errDummy,
|
||||
},
|
||||
{
|
||||
name: "cancel with nil",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -223,6 +207,7 @@ func TestMapReduce(t *testing.T) {
|
||||
expectValue: nil,
|
||||
},
|
||||
{
|
||||
name: "cancel with more",
|
||||
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
@@ -237,45 +222,68 @@ func TestMapReduce(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
}
|
||||
}
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
t.Run("MapReduce", func(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
value, err := MapReduce(func(source chan<- interface{}) {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
||||
value, err := MapReduce(func(source chan<- interface{}) {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
||||
|
||||
assert.Equal(t, test.expectErr, err)
|
||||
assert.Equal(t, test.expectValue, value)
|
||||
})
|
||||
}
|
||||
}
|
||||
assert.Equal(t, test.expectErr, err)
|
||||
assert.Equal(t, test.expectValue, value)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
t.Run("MapReduce", func(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
}
|
||||
}
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
panic("foo")
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
panic("bar")
|
||||
source := make(chan interface{})
|
||||
go func() {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
|
||||
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
|
||||
assert.Equal(t, test.expectErr, err)
|
||||
assert.Equal(t, test.expectValue, value)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -302,16 +310,19 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
|
||||
var value uint32
|
||||
tests := []struct {
|
||||
name string
|
||||
mapper MapperFunc
|
||||
reducer VoidReducerFunc
|
||||
expectValue uint32
|
||||
expectErr error
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
expectValue: 30,
|
||||
expectErr: nil,
|
||||
},
|
||||
{
|
||||
name: "cancel with error",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -322,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
expectErr: errDummy,
|
||||
},
|
||||
{
|
||||
name: "cancel with nil",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -332,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
expectErr: ErrCancelWithNil,
|
||||
},
|
||||
{
|
||||
name: "cancel with more",
|
||||
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
result := atomic.AddUint32(&value, uint32(item.(int)))
|
||||
@@ -345,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
atomic.StoreUint32(&value, 0)
|
||||
|
||||
if test.mapper == nil {
|
||||
@@ -400,39 +413,59 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
|
||||
assert.Equal(t, 0, result[1])
|
||||
}
|
||||
|
||||
func TestMapVoid(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
const tasks = 1000
|
||||
var count uint32
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}) {
|
||||
atomic.AddUint32(&count, 1)
|
||||
})
|
||||
|
||||
assert.Equal(t, tasks, int(count))
|
||||
}
|
||||
|
||||
func TestMapReducePanic(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
v, err := MapReduce(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("panic")
|
||||
}
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("panic")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapReducePanicOnce(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
for i := 0; i < 100; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
if i == 0 {
|
||||
panic("foo")
|
||||
}
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("bar")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
panic("foo")
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
panic("bar")
|
||||
})
|
||||
})
|
||||
assert.Nil(t, v)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "panic", err.Error())
|
||||
}
|
||||
|
||||
func TestMapReduceVoidCancel(t *testing.T) {
|
||||
@@ -461,13 +494,13 @@ func TestMapReduceVoidCancel(t *testing.T) {
|
||||
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var done syncx.AtomicBool
|
||||
var done int32
|
||||
var result []int
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
done.Set(true)
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
if i == defaultWorkers/2 {
|
||||
@@ -482,7 +515,7 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||
})
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "anything", err.Error())
|
||||
assert.True(t, done.True())
|
||||
assert.Equal(t, int32(1), done)
|
||||
}
|
||||
|
||||
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
||||
@@ -507,34 +540,51 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
const message = "foo"
|
||||
var done syncx.AtomicBool
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
assert.Panics(t, func() {
|
||||
var done int32
|
||||
_ = MapReduceVoid(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||
panic(message)
|
||||
}, WithWorkers(1))
|
||||
})
|
||||
}
|
||||
|
||||
func TestForEachWithContext(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var done int32
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
done.Set(true)
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||
panic(message)
|
||||
}, WithWorkers(1))
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, message, err.Error())
|
||||
assert.True(t, done.True())
|
||||
if i == defaultWorkers/2 {
|
||||
cancel()
|
||||
}
|
||||
}, WithContext(ctx))
|
||||
}
|
||||
|
||||
func TestMapReduceWithContext(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var done syncx.AtomicBool
|
||||
var done int32
|
||||
var result []int
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
done.Set(true)
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}, writer Writer, c func(error)) {
|
||||
i := item.(int)
|
||||
if i == defaultWorkers/2 {
|
||||
|
||||
@@ -54,7 +54,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mr"
|
||||
"github.com/zeromicro/go-zero/core/mr"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
@@ -55,7 +55,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mr"
|
||||
"github.com/zeromicro/go-zero/core/mr"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -87,4 +87,4 @@ More examples: [https://github.com/zeromicro/zero-examples/tree/main/mapreduce](
|
||||
|
||||
## Give a Star! ⭐
|
||||
|
||||
If you like or are using this project to learn or start your solution, please give it a star. Thanks!
|
||||
If you like or are using this project to learn or start your solution, please give it a star. Thanks!
|
||||
|
||||
30
core/prof/runtime.go
Normal file
30
core/prof/runtime.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package prof
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultInterval = time.Second * 5
|
||||
mega = 1024 * 1024
|
||||
)
|
||||
|
||||
func DisplayStats(interval ...time.Duration) {
|
||||
duration := defaultInterval
|
||||
for _, val := range interval {
|
||||
duration = val
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(duration)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
fmt.Printf("Goroutines: %d, Alloc: %vm, TotalAlloc: %vm, Sys: %vm, NumGC: %v\n",
|
||||
runtime.NumGoroutine(), m.Alloc/mega, m.TotalAlloc/mega, m.Sys/mega, m.NumGC)
|
||||
}
|
||||
}()
|
||||
}
|
||||
1
go.sum
1
go.sum
@@ -719,7 +719,6 @@ k8s.io/client-go v0.20.12 h1:U75SxTC31BHT9i7CbX/hL4v+U1Wkzy/E1vt5ClDPp3I=
|
||||
k8s.io/client-go v0.20.12/go.mod h1:NBJj6Evp73Xy/4v/O/RDRaH0+3JoxNfjRxkyRgrdbsA=
|
||||
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
|
||||
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
|
||||
k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ=
|
||||
k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
|
||||
k8s.io/klog/v2 v2.40.1 h1:P4RRucWk/lFOlDdkAr3mc7iWFkgKrZY9qZMAgek06S4=
|
||||
k8s.io/klog/v2 v2.40.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
[](https://github.com/zeromicro/go-zero)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
|
||||
> ***缩短从需求到上线的距离***
|
||||
|
||||
**注意:为了满足开源基金会要求,go-zero 从好未来(tal-tech)组织下迁移至中立的 GitHub 组织(zeromicro)。**
|
||||
|
||||
## 0. go-zero 介绍
|
||||
@@ -87,7 +89,7 @@ go-zero 是一个集成了各种工程实践的包含 web 和 rpc 框架,有
|
||||
在项目目录下通过如下命令安装:
|
||||
|
||||
```shell
|
||||
GO111MODULE=on GOPROXY=https://goproxy.cn/,direct go get -u github.com/tal-tech/go-zero
|
||||
GO111MODULE=on GOPROXY=https://goproxy.cn/,direct go get -u github.com/zeromicro/go-zero
|
||||
```
|
||||
|
||||
## 5. Quick Start
|
||||
@@ -104,10 +106,10 @@ GO111MODULE=on GOPROXY=https://goproxy.cn/,direct go get -u github.com/tal-tech/
|
||||
|
||||
```shell
|
||||
# Go 1.15 及之前版本
|
||||
GO111MODULE=on GOPROXY=https://goproxy.cn/,direct go get -u github.com/tal-tech/go-zero/tools/goctl@latest
|
||||
GO111MODULE=on GOPROXY=https://goproxy.cn/,direct go get -u github.com/zeromicro/go-zero/tools/goctl@latest
|
||||
|
||||
# Go 1.16 及以后版本
|
||||
GOPROXY=https://goproxy.cn/,direct go install github.com/tal-tech/go-zero/tools/goctl@latest
|
||||
GOPROXY=https://goproxy.cn/,direct go install github.com/zeromicro/go-zero/tools/goctl@latest
|
||||
```
|
||||
|
||||
确保 goctl 可执行
|
||||
|
||||
@@ -90,7 +90,7 @@ As below, go-zero protects the system with a couple of layers and mechanisms:
|
||||
Run the following command under your project:
|
||||
|
||||
```shell
|
||||
go get -u github.com/tal-tech/go-zero
|
||||
go get -u github.com/zeromicro/go-zero
|
||||
```
|
||||
|
||||
## 6. Quick Start
|
||||
@@ -107,10 +107,10 @@ go get -u github.com/tal-tech/go-zero
|
||||
|
||||
```shell
|
||||
# for Go 1.15 and earlier
|
||||
GO111MODULE=on go get -u github.com/tal-tech/go-zero/tools/goctl@latest
|
||||
GO111MODULE=on go get -u github.com/zeromicro/go-zero/tools/goctl@latest
|
||||
|
||||
# for Go 1.16 and later
|
||||
go install github.com/tal-tech/go-zero/tools/goctl@latest
|
||||
go install github.com/zeromicro/go-zero/tools/goctl@latest
|
||||
```
|
||||
|
||||
make sure goctl is executable.
|
||||
|
||||
@@ -635,3 +635,4 @@ func NewSyntaxLitContext(parser antlr.Parser, parent antlr.ParserRuleContext, in
|
||||
return p
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
module github.com/zeromicro/go-zero/tools/goctl
|
||||
|
||||
go 1.17
|
||||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||
@@ -15,23 +15,3 @@ require (
|
||||
github.com/zeromicro/ddl-parser v0.0.0-20210712021150-63520aca7348
|
||||
github.com/zeromicro/go-zero v1.3.0-beta
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
|
||||
github.com/alicebob/miniredis/v2 v2.17.0 // indirect
|
||||
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521184019-c5ad59b459ec // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/go-redis/redis v6.15.9+incompatible // indirect
|
||||
github.com/lib/pq v1.10.4 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.0.1 // indirect
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
|
||||
go.opentelemetry.io/otel v1.3.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.3.0 // indirect
|
||||
go.uber.org/automaxprocs v1.4.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
)
|
||||
|
||||
@@ -173,6 +173,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
|
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
@@ -271,6 +272,7 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
|
||||
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4=
|
||||
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
@@ -379,6 +381,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0=
|
||||
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
|
||||
@@ -414,6 +417,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
|
||||
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
|
||||
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
|
||||
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
|
||||
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
|
||||
@@ -423,6 +427,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
|
||||
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
@@ -569,10 +574,12 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
|
||||
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
|
||||
@@ -636,6 +643,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
|
||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
|
||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"runtime"
|
||||
|
||||
"github.com/logrusorgru/aurora"
|
||||
"github.com/urfave/cli"
|
||||
"github.com/zeromicro/go-zero/core/load"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/tools/goctl/api/apigen"
|
||||
@@ -30,7 +31,6 @@ import (
|
||||
rpc "github.com/zeromicro/go-zero/tools/goctl/rpc/cli"
|
||||
"github.com/zeromicro/go-zero/tools/goctl/tpl"
|
||||
"github.com/zeromicro/go-zero/tools/goctl/upgrade"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
const codeFailure = 1
|
||||
@@ -58,7 +58,7 @@ var commands = []cli.Command{
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "version",
|
||||
Usage: "the target release version of github.com/zeromicro/go-zero to refactor",
|
||||
Usage: "the target release version of github.com/zeromicro/go-zero to migrate",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
)
|
||||
|
||||
// BuildVersion is the version of goctl.
|
||||
const BuildVersion = "1.3.0"
|
||||
const BuildVersion = "1.3.0-20220201"
|
||||
|
||||
var tag = map[string]int{"pre-alpha": 0, "alpha": 1, "pre-bata": 2, "beta": 3, "released": 4, "": 5}
|
||||
|
||||
|
||||
7
tools/goctl/migrate/cancel+polyfill.go
Normal file
7
tools/goctl/migrate/cancel+polyfill.go
Normal file
@@ -0,0 +1,7 @@
|
||||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package migrate
|
||||
|
||||
func cancelOnSignals() {
|
||||
}
|
||||
31
tools/goctl/migrate/cancel.go
Normal file
31
tools/goctl/migrate/cancel.go
Normal file
@@ -0,0 +1,31 @@
|
||||
//go:build linux || darwin
|
||||
// +build linux darwin
|
||||
|
||||
package migrate
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/tools/goctl/util/console"
|
||||
)
|
||||
|
||||
func cancelOnSignals() {
|
||||
doneChan := syncx.NewDoneChan()
|
||||
defer doneChan.Close()
|
||||
|
||||
go func(dc *syncx.DoneChan) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGINT, syscall.SIGTSTP, syscall.SIGQUIT)
|
||||
select {
|
||||
case <-c:
|
||||
console.Error(`
|
||||
migrate failed, reason: "User Canceled"`)
|
||||
os.Exit(0)
|
||||
case <-dc.Done():
|
||||
return
|
||||
}
|
||||
}(doneChan)
|
||||
}
|
||||
@@ -11,17 +11,29 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/logrusorgru/aurora"
|
||||
"github.com/urfave/cli"
|
||||
"github.com/zeromicro/go-zero/tools/goctl/util/console"
|
||||
"github.com/zeromicro/go-zero/tools/goctl/util/ctx"
|
||||
"github.com/urfave/cli"
|
||||
"github.com/zeromicro/go-zero/tools/goctl/vars"
|
||||
)
|
||||
|
||||
const zeromicroVersion = "1.3.0"
|
||||
|
||||
var fset = token.NewFileSet()
|
||||
const (
|
||||
confirmUnknown = iota
|
||||
confirmAll
|
||||
confirmIgnore
|
||||
)
|
||||
|
||||
var (
|
||||
fset = token.NewFileSet()
|
||||
builderxConfirm = confirmUnknown
|
||||
)
|
||||
|
||||
func Migrate(c *cli.Context) error {
|
||||
verbose := c.Bool("verbose")
|
||||
@@ -44,7 +56,9 @@ func Migrate(c *cli.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
console.Success("[OK] refactor finish, execute %q on project root to check status.", "go test -race ./...")
|
||||
if verbose {
|
||||
console.Success("[OK] refactor finish, execute %q on project root to check status.", "go test -race ./...")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -54,6 +68,8 @@ func rewriteImport(verbose bool) error {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
cancelOnSignals()
|
||||
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -64,7 +80,8 @@ func rewriteImport(verbose bool) error {
|
||||
}
|
||||
root := project.Dir
|
||||
fsys := os.DirFS(root)
|
||||
return fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error {
|
||||
var final []*ast.Package
|
||||
err = fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error {
|
||||
if !d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
@@ -78,23 +95,71 @@ func rewriteImport(verbose bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return rewriteFile(pkgs, verbose)
|
||||
err = rewriteFile(pkgs, verbose)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, v := range pkgs {
|
||||
final = append(final, v)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if verbose {
|
||||
console.Info("start to write files ... ")
|
||||
}
|
||||
return writeFile(final, verbose)
|
||||
}
|
||||
|
||||
func rewriteFile(pkgs map[string]*ast.Package, verbose bool) error {
|
||||
for _, pkg := range pkgs {
|
||||
for filename, file := range pkg.Files {
|
||||
var containsDeprecatedBuilderxPkg bool
|
||||
for _, imp := range file.Imports {
|
||||
if !strings.Contains(imp.Path.Value, deprecatedGoZeroMod) {
|
||||
continue
|
||||
}
|
||||
|
||||
if verbose {
|
||||
console.Debug("[...] migrating %q ... ", filepath.Base(filename))
|
||||
}
|
||||
|
||||
if strings.Contains(imp.Path.Value, deprecatedBuilderx) {
|
||||
containsDeprecatedBuilderxPkg = true
|
||||
var doNext bool
|
||||
refactorBuilderx(deprecatedBuilderx, replacementBuilderx, func(allow bool) {
|
||||
doNext = !allow
|
||||
if allow {
|
||||
newPath := strings.ReplaceAll(imp.Path.Value, deprecatedBuilderx, replacementBuilderx)
|
||||
imp.EndPos = imp.End()
|
||||
imp.Path.Value = newPath
|
||||
}
|
||||
})
|
||||
if !doNext {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
newPath := strings.ReplaceAll(imp.Path.Value, deprecatedGoZeroMod, goZeroMod)
|
||||
imp.EndPos = imp.End()
|
||||
imp.Path.Value = newPath
|
||||
}
|
||||
|
||||
var w = bytes.NewBuffer(nil)
|
||||
if containsDeprecatedBuilderxPkg {
|
||||
replacePkg(file)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeFile(pkgs []*ast.Package, verbose bool) error {
|
||||
for _, pkg := range pkgs {
|
||||
for filename, file := range pkg.Files {
|
||||
w := bytes.NewBuffer(nil)
|
||||
err := format.Node(w, fset, file)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[rewriteImport] format file %s error: %+v", filename, err)
|
||||
@@ -105,9 +170,107 @@ func rewriteFile(pkgs map[string]*ast.Package, verbose bool) error {
|
||||
return fmt.Errorf("[rewriteImport] write file %s error: %+v", filename, err)
|
||||
}
|
||||
if verbose {
|
||||
console.Success("[OK] rewriting %q ... ", filepath.Base(filename))
|
||||
console.Success("[OK] migrated %q successfully", filepath.Base(filename))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func replacePkg(file *ast.File) {
|
||||
scope := file.Scope
|
||||
if scope == nil {
|
||||
return
|
||||
}
|
||||
obj := scope.Objects
|
||||
for _, v := range obj {
|
||||
decl := v.Decl
|
||||
if decl == nil {
|
||||
continue
|
||||
}
|
||||
vs, ok := decl.(*ast.ValueSpec)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
values := vs.Values
|
||||
if len(values) != 1 {
|
||||
continue
|
||||
}
|
||||
value := values[0]
|
||||
callExpr, ok := value.(*ast.CallExpr)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
fn := callExpr.Fun
|
||||
if fn == nil {
|
||||
continue
|
||||
}
|
||||
selector, ok := fn.(*ast.SelectorExpr)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
x := selector.X
|
||||
sel := selector.Sel
|
||||
if x == nil || sel == nil {
|
||||
continue
|
||||
}
|
||||
ident, ok := x.(*ast.Ident)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if ident.Name == "builderx" {
|
||||
ident.Name = "builder"
|
||||
ident.NamePos = ident.End()
|
||||
}
|
||||
if sel.Name == "FieldNames" {
|
||||
sel.Name = "RawFieldNames"
|
||||
sel.NamePos = sel.End()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func refactorBuilderx(deprecated, replacement string, fn func(allow bool)) {
|
||||
switch builderxConfirm {
|
||||
case confirmAll:
|
||||
fn(true)
|
||||
return
|
||||
case confirmIgnore:
|
||||
fn(false)
|
||||
return
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf(`Detects a deprecated package in the source code,
|
||||
Deprecated package: %q
|
||||
Replacement package: %q
|
||||
It's recommended to use the replacement package, do you want to replace?
|
||||
['Y' for yes, 'N' for no, 'A' for all, 'I' for ignore]: `,
|
||||
deprecated, replacement)
|
||||
|
||||
if runtime.GOOS != vars.OsWindows {
|
||||
msg = aurora.Yellow(msg).String()
|
||||
}
|
||||
fmt.Print(msg)
|
||||
|
||||
for {
|
||||
var in string
|
||||
fmt.Scanln(&in)
|
||||
switch {
|
||||
case strings.EqualFold(in, "Y"):
|
||||
fn(true)
|
||||
return
|
||||
case strings.EqualFold(in, "N"):
|
||||
fn(false)
|
||||
return
|
||||
case strings.EqualFold(in, "A"):
|
||||
fn(true)
|
||||
builderxConfirm = confirmAll
|
||||
return
|
||||
case strings.EqualFold(in, "I"):
|
||||
fn(false)
|
||||
builderxConfirm = confirmIgnore
|
||||
return
|
||||
default:
|
||||
console.Warning("['Y' for yes, 'N' for no, 'A' for all, 'I' for ignore]: ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,9 @@ import (
|
||||
"github.com/zeromicro/go-zero/tools/goctl/util/ctx"
|
||||
)
|
||||
|
||||
const deprecatedGoZeroMod = "github.com/zeromicro/go-zero"
|
||||
const deprecatedGoZeroMod = "github.com/tal-tech/go-zero"
|
||||
const deprecatedBuilderx = "github.com/tal-tech/go-zero/tools/goctl/model/sql/builderx"
|
||||
const replacementBuilderx = "github.com/zeromicro/go-zero/core/stores/builder"
|
||||
const goZeroMod = "github.com/zeromicro/go-zero"
|
||||
|
||||
var errInvalidGoMod = errors.New("it's only working for go module")
|
||||
|
||||
@@ -66,7 +66,7 @@ type User struct {
|
||||
|
||||
"github.com/globalsign/mgo/bson"
|
||||
cachec "github.com/zeromicro/go-zero/core/stores/cache"
|
||||
"github.com/tal-tech/go-zero/core/stores/mongoc"
|
||||
"github.com/zeromicro/go-zero/core/stores/mongoc"
|
||||
)
|
||||
|
||||
type UserModel interface {
|
||||
@@ -207,4 +207,4 @@ OPTIONS:
|
||||
|
||||
types.go本质上与xxxmodel.go无关,只是将type定义部分交给开发人员自己编写了,在xxxmodel.go中,mongo文档的存储结构必须包含
|
||||
`_id`字段,对应到types中的field为`ID`,model中的findOne,update均以data.ID来进行操作的,当然,如果不符合你的命名风格,你也 可以修改模板,只要保证`id`
|
||||
在types中的field名称和模板中一致就行。
|
||||
在types中的field名称和模板中一致就行。
|
||||
|
||||
Reference in New Issue
Block a user