Compare commits
6 Commits
tools/goct
...
v1.3.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7727d70634 | ||
|
|
5f9d101bc6 | ||
|
|
6c2abe7474 | ||
|
|
14a902c1a7 | ||
|
|
5ad6a6d229 | ||
|
|
6f4b97864a |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -16,7 +16,8 @@
|
|||||||
**/logs
|
**/logs
|
||||||
|
|
||||||
# for test purpose
|
# for test purpose
|
||||||
adhoc
|
**/adhoc
|
||||||
|
**/testdata
|
||||||
|
|
||||||
# gitlab ci
|
# gitlab ci
|
||||||
.cache
|
.cache
|
||||||
|
|||||||
@@ -3,12 +3,11 @@ package mr
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/zeromicro/go-zero/core/errorx"
|
"github.com/zeromicro/go-zero/core/errorx"
|
||||||
"github.com/zeromicro/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/zeromicro/go-zero/core/threading"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -42,6 +41,16 @@ type (
|
|||||||
// Option defines the method to customize the mapreduce.
|
// Option defines the method to customize the mapreduce.
|
||||||
Option func(opts *mapReduceOptions)
|
Option func(opts *mapReduceOptions)
|
||||||
|
|
||||||
|
mapperContext struct {
|
||||||
|
ctx context.Context
|
||||||
|
mapper MapFunc
|
||||||
|
source <-chan interface{}
|
||||||
|
panicChan *onceChan
|
||||||
|
collector chan<- interface{}
|
||||||
|
doneChan <-chan lang.PlaceholderType
|
||||||
|
workers int
|
||||||
|
}
|
||||||
|
|
||||||
mapReduceOptions struct {
|
mapReduceOptions struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
workers int
|
workers int
|
||||||
@@ -90,46 +99,72 @@ func FinishVoid(fns ...func()) {
|
|||||||
|
|
||||||
// ForEach maps all elements from given generate but no output.
|
// ForEach maps all elements from given generate but no output.
|
||||||
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
||||||
drain(Map(generate, func(item interface{}, writer Writer) {
|
|
||||||
mapper(item)
|
|
||||||
}, opts...))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Map maps all elements generated from given generate func, and returns an output channel.
|
|
||||||
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
|
|
||||||
options := buildOptions(opts...)
|
options := buildOptions(opts...)
|
||||||
source := buildSource(generate)
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
|
source := buildSource(generate, panicChan)
|
||||||
collector := make(chan interface{}, options.workers)
|
collector := make(chan interface{}, options.workers)
|
||||||
done := make(chan lang.PlaceholderType)
|
done := make(chan lang.PlaceholderType)
|
||||||
|
|
||||||
go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
|
go executeMappers(mapperContext{
|
||||||
|
ctx: options.ctx,
|
||||||
|
mapper: func(item interface{}, writer Writer) {
|
||||||
|
mapper(item)
|
||||||
|
},
|
||||||
|
source: source,
|
||||||
|
panicChan: panicChan,
|
||||||
|
collector: collector,
|
||||||
|
doneChan: done,
|
||||||
|
workers: options.workers,
|
||||||
|
})
|
||||||
|
|
||||||
return collector
|
for {
|
||||||
|
select {
|
||||||
|
case v := <-panicChan.channel:
|
||||||
|
panic(v)
|
||||||
|
case _, ok := <-collector:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapReduce maps all elements generated from given generate func,
|
// MapReduce maps all elements generated from given generate func,
|
||||||
// and reduces the output elements with given reducer.
|
// and reduces the output elements with given reducer.
|
||||||
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
||||||
opts ...Option) (interface{}, error) {
|
opts ...Option) (interface{}, error) {
|
||||||
source := buildSource(generate)
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
return MapReduceChan(source, mapper, reducer, opts...)
|
source := buildSource(generate, panicChan)
|
||||||
|
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||||
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
||||||
opts ...Option) (interface{}, error) {
|
opts ...Option) (interface{}, error) {
|
||||||
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
|
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||||
|
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
|
||||||
|
reducer ReducerFunc, opts ...Option) (interface{}, error) {
|
||||||
options := buildOptions(opts...)
|
options := buildOptions(opts...)
|
||||||
|
// output is used to write the final result
|
||||||
output := make(chan interface{})
|
output := make(chan interface{})
|
||||||
defer func() {
|
defer func() {
|
||||||
|
// reducer can only write once, if more, panic
|
||||||
for range output {
|
for range output {
|
||||||
panic("more than one element written in reducer")
|
panic("more than one element written in reducer")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// collector is used to collect data from mapper, and consume in reducer
|
||||||
collector := make(chan interface{}, options.workers)
|
collector := make(chan interface{}, options.workers)
|
||||||
|
// if done is closed, all mappers and reducer should stop processing
|
||||||
done := make(chan lang.PlaceholderType)
|
done := make(chan lang.PlaceholderType)
|
||||||
writer := newGuardedWriter(options.ctx, output, done)
|
writer := newGuardedWriter(options.ctx, output, done)
|
||||||
var closeOnce sync.Once
|
var closeOnce sync.Once
|
||||||
|
// use atomic.Value to avoid data race
|
||||||
var retErr errorx.AtomicError
|
var retErr errorx.AtomicError
|
||||||
finish := func() {
|
finish := func() {
|
||||||
closeOnce.Do(func() {
|
closeOnce.Do(func() {
|
||||||
@@ -151,30 +186,38 @@ func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer Reducer
|
|||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
drain(collector)
|
drain(collector)
|
||||||
|
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
cancel(fmt.Errorf("%v", r))
|
panicChan.write(r)
|
||||||
} else {
|
|
||||||
finish()
|
|
||||||
}
|
}
|
||||||
|
finish()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
reducer(collector, writer, cancel)
|
reducer(collector, writer, cancel)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go executeMappers(options.ctx, func(item interface{}, w Writer) {
|
go executeMappers(mapperContext{
|
||||||
mapper(item, w, cancel)
|
ctx: options.ctx,
|
||||||
}, source, collector, done, options.workers)
|
mapper: func(item interface{}, w Writer) {
|
||||||
|
mapper(item, w, cancel)
|
||||||
|
},
|
||||||
|
source: source,
|
||||||
|
panicChan: panicChan,
|
||||||
|
collector: collector,
|
||||||
|
doneChan: done,
|
||||||
|
workers: options.workers,
|
||||||
|
})
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-options.ctx.Done():
|
case <-options.ctx.Done():
|
||||||
cancel(context.DeadlineExceeded)
|
cancel(context.DeadlineExceeded)
|
||||||
return nil, context.DeadlineExceeded
|
return nil, context.DeadlineExceeded
|
||||||
case value, ok := <-output:
|
case v := <-panicChan.channel:
|
||||||
|
panic(v)
|
||||||
|
case v, ok := <-output:
|
||||||
if err := retErr.Load(); err != nil {
|
if err := retErr.Load(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if ok {
|
} else if ok {
|
||||||
return value, nil
|
return v, nil
|
||||||
} else {
|
} else {
|
||||||
return nil, ErrReduceNoOutput
|
return nil, ErrReduceNoOutput
|
||||||
}
|
}
|
||||||
@@ -221,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildSource(generate GenerateFunc) chan interface{} {
|
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
|
||||||
source := make(chan interface{})
|
source := make(chan interface{})
|
||||||
threading.GoSafe(func() {
|
go func() {
|
||||||
defer close(source)
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
panicChan.write(r)
|
||||||
|
}
|
||||||
|
close(source)
|
||||||
|
}()
|
||||||
|
|
||||||
generate(source)
|
generate(source)
|
||||||
})
|
}()
|
||||||
|
|
||||||
return source
|
return source
|
||||||
}
|
}
|
||||||
@@ -238,39 +287,43 @@ func drain(channel <-chan interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
|
func executeMappers(mCtx mapperContext) {
|
||||||
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
defer func() {
|
defer func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(collector)
|
close(mCtx.collector)
|
||||||
|
drain(mCtx.source)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pool := make(chan lang.PlaceholderType, workers)
|
var failed int32
|
||||||
writer := newGuardedWriter(ctx, collector, done)
|
pool := make(chan lang.PlaceholderType, mCtx.workers)
|
||||||
for {
|
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
|
||||||
|
for atomic.LoadInt32(&failed) == 0 {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-mCtx.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-done:
|
case <-mCtx.doneChan:
|
||||||
return
|
return
|
||||||
case pool <- lang.Placeholder:
|
case pool <- lang.Placeholder:
|
||||||
item, ok := <-input
|
item, ok := <-mCtx.source
|
||||||
if !ok {
|
if !ok {
|
||||||
<-pool
|
<-pool
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
// better to safely run caller defined method
|
go func() {
|
||||||
threading.GoSafe(func() {
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
atomic.AddInt32(&failed, 1)
|
||||||
|
mCtx.panicChan.write(r)
|
||||||
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
<-pool
|
<-pool
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mapper(item, writer)
|
mCtx.mapper(item, writer)
|
||||||
})
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -316,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
|
|||||||
gw.channel <- v
|
gw.channel <- v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type onceChan struct {
|
||||||
|
channel chan interface{}
|
||||||
|
wrote int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (oc *onceChan) write(val interface{}) {
|
||||||
|
if atomic.AddInt32(&oc.wrote, 1) > 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
oc.channel <- val
|
||||||
|
}
|
||||||
|
|||||||
78
core/mr/mapreduce_fuzz_test.go
Normal file
78
core/mr/mapreduce_fuzz_test.go
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
//go:build go1.18
|
||||||
|
// +build go1.18
|
||||||
|
|
||||||
|
package mr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/goleak"
|
||||||
|
)
|
||||||
|
|
||||||
|
func FuzzMapReduce(f *testing.F) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
f.Add(uint(10), uint(runtime.NumCPU()))
|
||||||
|
f.Fuzz(func(t *testing.T, num uint, workers uint) {
|
||||||
|
n := int64(num)%5000 + 5000
|
||||||
|
genPanic := rand.Intn(100) == 0
|
||||||
|
mapperPanic := rand.Intn(100) == 0
|
||||||
|
reducerPanic := rand.Intn(100) == 0
|
||||||
|
genIdx := rand.Int63n(n)
|
||||||
|
mapperIdx := rand.Int63n(n)
|
||||||
|
reducerIdx := rand.Int63n(n)
|
||||||
|
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||||
|
|
||||||
|
fn := func() (interface{}, error) {
|
||||||
|
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||||
|
|
||||||
|
return MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
source <- i
|
||||||
|
if genPanic && i == genIdx {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int64)
|
||||||
|
if mapperPanic && v == mapperIdx {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
writer.Write(v * v)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var idx int64
|
||||||
|
var total int64
|
||||||
|
for v := range pipe {
|
||||||
|
if reducerPanic && idx == reducerIdx {
|
||||||
|
panic("baz")
|
||||||
|
}
|
||||||
|
total += v.(int64)
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
writer.Write(total)
|
||||||
|
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if genPanic || mapperPanic || reducerPanic {
|
||||||
|
var buf strings.Builder
|
||||||
|
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||||
|
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||||
|
} else {
|
||||||
|
val, err := fn()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, squareSum, val.(int64))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
107
core/mr/mapreduce_rand_test.go
Normal file
107
core/mr/mapreduce_rand_test.go
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
//go:build fuzz
|
||||||
|
// +build fuzz
|
||||||
|
|
||||||
|
package mr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
|
"gopkg.in/cheggaaa/pb.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
|
||||||
|
// so we need to simulate the fuzz test in test mode.
|
||||||
|
func TestMapReduceRandom(t *testing.T) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
const (
|
||||||
|
times = 10000
|
||||||
|
nRange = 500
|
||||||
|
mega = 1024 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
bar := pb.New(times).Start()
|
||||||
|
runner := threading.NewTaskRunner(runtime.NumCPU())
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(times)
|
||||||
|
for i := 0; i < times; i++ {
|
||||||
|
runner.Schedule(func() {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
if time.Since(start) > time.Minute {
|
||||||
|
t.Fatal("timeout")
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||||
|
n := rand.Int63n(nRange)%nRange + nRange
|
||||||
|
workers := rand.Int()%50 + runtime.NumCPU()/2
|
||||||
|
genPanic := rand.Intn(100) == 0
|
||||||
|
mapperPanic := rand.Intn(100) == 0
|
||||||
|
reducerPanic := rand.Intn(100) == 0
|
||||||
|
genIdx := rand.Int63n(n)
|
||||||
|
mapperIdx := rand.Int63n(n)
|
||||||
|
reducerIdx := rand.Int63n(n)
|
||||||
|
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||||
|
|
||||||
|
fn := func() (interface{}, error) {
|
||||||
|
return MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
source <- i
|
||||||
|
if genPanic && i == genIdx {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int64)
|
||||||
|
if mapperPanic && v == mapperIdx {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
writer.Write(v * v)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var idx int64
|
||||||
|
var total int64
|
||||||
|
for v := range pipe {
|
||||||
|
if reducerPanic && idx == reducerIdx {
|
||||||
|
panic("baz")
|
||||||
|
}
|
||||||
|
total += v.(int64)
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
writer.Write(total)
|
||||||
|
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if genPanic || mapperPanic || reducerPanic {
|
||||||
|
var buf strings.Builder
|
||||||
|
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||||
|
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||||
|
} else {
|
||||||
|
val, err := fn()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, squareSum, val.(int64))
|
||||||
|
}
|
||||||
|
bar.Increment()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
bar.Finish()
|
||||||
|
}
|
||||||
@@ -11,8 +11,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/zeromicro/go-zero/core/stringx"
|
|
||||||
"github.com/zeromicro/go-zero/core/syncx"
|
|
||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -124,84 +122,69 @@ func TestForEach(t *testing.T) {
|
|||||||
t.Run("all", func(t *testing.T) {
|
t.Run("all", func(t *testing.T) {
|
||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
ForEach(func(source chan<- interface{}) {
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
for i := 0; i < tasks; i++ {
|
ForEach(func(source chan<- interface{}) {
|
||||||
source <- i
|
for i := 0; i < tasks; i++ {
|
||||||
}
|
source <- i
|
||||||
}, func(item interface{}) {
|
}
|
||||||
panic("foo")
|
}, func(item interface{}) {
|
||||||
|
panic("foo")
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMap(t *testing.T) {
|
func TestGeneratePanic(t *testing.T) {
|
||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
tests := []struct {
|
t.Run("all", func(t *testing.T) {
|
||||||
mapper MapFunc
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
expect int
|
ForEach(func(source chan<- interface{}) {
|
||||||
}{
|
panic("foo")
|
||||||
{
|
}, func(item interface{}) {
|
||||||
mapper: func(item interface{}, writer Writer) {
|
})
|
||||||
v := item.(int)
|
})
|
||||||
writer.Write(v * v)
|
})
|
||||||
},
|
}
|
||||||
expect: 30,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
if v%2 == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
if v%2 == 0 {
|
|
||||||
panic(v)
|
|
||||||
}
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
func TestMapperPanic(t *testing.T) {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
defer goleak.VerifyNone(t)
|
||||||
channel := Map(func(source chan<- interface{}) {
|
|
||||||
for i := 1; i < 5; i++ {
|
const tasks = 1000
|
||||||
|
var run int32
|
||||||
|
t.Run("all", func(t *testing.T) {
|
||||||
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
}, test.mapper, WithWorkers(-1))
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
atomic.AddInt32(&run, 1)
|
||||||
var result int
|
panic("foo")
|
||||||
for v := range channel {
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
result += v.(int)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, test.expect, result)
|
|
||||||
})
|
})
|
||||||
}
|
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduce(t *testing.T) {
|
func TestMapReduce(t *testing.T) {
|
||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
name string
|
||||||
mapper MapperFunc
|
mapper MapperFunc
|
||||||
reducer ReducerFunc
|
reducer ReducerFunc
|
||||||
expectErr error
|
expectErr error
|
||||||
expectValue interface{}
|
expectValue interface{}
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "simple",
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectValue: 30,
|
expectValue: 30,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with error",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -212,6 +195,7 @@ func TestMapReduce(t *testing.T) {
|
|||||||
expectErr: errDummy,
|
expectErr: errDummy,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with nil",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -223,6 +207,7 @@ func TestMapReduce(t *testing.T) {
|
|||||||
expectValue: nil,
|
expectValue: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with more",
|
||||||
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
var result int
|
var result int
|
||||||
for item := range pipe {
|
for item := range pipe {
|
||||||
@@ -237,45 +222,68 @@ func TestMapReduce(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
t.Run("MapReduce", func(t *testing.T) {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
for _, test := range tests {
|
||||||
if test.mapper == nil {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
if test.mapper == nil {
|
||||||
v := item.(int)
|
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
writer.Write(v * v)
|
v := item.(int)
|
||||||
}
|
writer.Write(v * v)
|
||||||
}
|
|
||||||
if test.reducer == nil {
|
|
||||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
|
||||||
var result int
|
|
||||||
for item := range pipe {
|
|
||||||
result += item.(int)
|
|
||||||
}
|
}
|
||||||
writer.Write(result)
|
|
||||||
}
|
}
|
||||||
}
|
if test.reducer == nil {
|
||||||
value, err := MapReduce(func(source chan<- interface{}) {
|
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
for i := 1; i < 5; i++ {
|
var result int
|
||||||
source <- i
|
for item := range pipe {
|
||||||
|
result += item.(int)
|
||||||
|
}
|
||||||
|
writer.Write(result)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
value, err := MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := 1; i < 5; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
||||||
|
|
||||||
assert.Equal(t, test.expectErr, err)
|
assert.Equal(t, test.expectErr, err)
|
||||||
assert.Equal(t, test.expectValue, value)
|
assert.Equal(t, test.expectValue, value)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
t.Run("MapReduce", func(t *testing.T) {
|
||||||
defer goleak.VerifyNone(t)
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
if test.mapper == nil {
|
||||||
|
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int)
|
||||||
|
writer.Write(v * v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if test.reducer == nil {
|
||||||
|
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var result int
|
||||||
|
for item := range pipe {
|
||||||
|
result += item.(int)
|
||||||
|
}
|
||||||
|
writer.Write(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
source := make(chan interface{})
|
||||||
source <- 0
|
go func() {
|
||||||
source <- 1
|
for i := 1; i < 5; i++ {
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
source <- i
|
||||||
panic("foo")
|
}
|
||||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
close(source)
|
||||||
panic("bar")
|
}()
|
||||||
|
|
||||||
|
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
|
||||||
|
assert.Equal(t, test.expectErr, err)
|
||||||
|
assert.Equal(t, test.expectValue, value)
|
||||||
|
})
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -302,16 +310,19 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
|
|
||||||
var value uint32
|
var value uint32
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
name string
|
||||||
mapper MapperFunc
|
mapper MapperFunc
|
||||||
reducer VoidReducerFunc
|
reducer VoidReducerFunc
|
||||||
expectValue uint32
|
expectValue uint32
|
||||||
expectErr error
|
expectErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "simple",
|
||||||
expectValue: 30,
|
expectValue: 30,
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with error",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -322,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
expectErr: errDummy,
|
expectErr: errDummy,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with nil",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -332,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
expectErr: ErrCancelWithNil,
|
expectErr: ErrCancelWithNil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with more",
|
||||||
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
for item := range pipe {
|
for item := range pipe {
|
||||||
result := atomic.AddUint32(&value, uint32(item.(int)))
|
result := atomic.AddUint32(&value, uint32(item.(int)))
|
||||||
@@ -345,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
atomic.StoreUint32(&value, 0)
|
atomic.StoreUint32(&value, 0)
|
||||||
|
|
||||||
if test.mapper == nil {
|
if test.mapper == nil {
|
||||||
@@ -400,39 +413,59 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
|
|||||||
assert.Equal(t, 0, result[1])
|
assert.Equal(t, 0, result[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapVoid(t *testing.T) {
|
|
||||||
defer goleak.VerifyNone(t)
|
|
||||||
|
|
||||||
const tasks = 1000
|
|
||||||
var count uint32
|
|
||||||
ForEach(func(source chan<- interface{}) {
|
|
||||||
for i := 0; i < tasks; i++ {
|
|
||||||
source <- i
|
|
||||||
}
|
|
||||||
}, func(item interface{}) {
|
|
||||||
atomic.AddUint32(&count, 1)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Equal(t, tasks, int(count))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMapReducePanic(t *testing.T) {
|
func TestMapReducePanic(t *testing.T) {
|
||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
v, err := MapReduce(func(source chan<- interface{}) {
|
assert.Panics(t, func() {
|
||||||
source <- 0
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
source <- 1
|
source <- 0
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
source <- 1
|
||||||
i := item.(int)
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
writer.Write(i)
|
i := item.(int)
|
||||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
writer.Write(i)
|
||||||
for range pipe {
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
panic("panic")
|
for range pipe {
|
||||||
}
|
panic("panic")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapReducePanicOnce(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
assert.Panics(t, func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
i := item.(int)
|
||||||
|
if i == 0 {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
for range pipe {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
assert.Panics(t, func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
source <- 0
|
||||||
|
source <- 1
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
panic("foo")
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
panic("bar")
|
||||||
|
})
|
||||||
})
|
})
|
||||||
assert.Nil(t, v)
|
|
||||||
assert.NotNil(t, err)
|
|
||||||
assert.Equal(t, "panic", err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidCancel(t *testing.T) {
|
func TestMapReduceVoidCancel(t *testing.T) {
|
||||||
@@ -461,13 +494,13 @@ func TestMapReduceVoidCancel(t *testing.T) {
|
|||||||
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var done syncx.AtomicBool
|
var done int32
|
||||||
var result []int
|
var result []int
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
if i == defaultWorkers/2 {
|
if i == defaultWorkers/2 {
|
||||||
@@ -482,7 +515,7 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
assert.Equal(t, "anything", err.Error())
|
assert.Equal(t, "anything", err.Error())
|
||||||
assert.True(t, done.True())
|
assert.Equal(t, int32(1), done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
||||||
@@ -507,34 +540,51 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
|||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
const message = "foo"
|
const message = "foo"
|
||||||
var done syncx.AtomicBool
|
assert.Panics(t, func() {
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
var done int32
|
||||||
|
_ = MapReduceVoid(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&done, 1)
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
i := item.(int)
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
|
panic(message)
|
||||||
|
}, WithWorkers(1))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForEachWithContext(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var done int32
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
}, func(item interface{}) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
writer.Write(i)
|
if i == defaultWorkers/2 {
|
||||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
cancel()
|
||||||
panic(message)
|
}
|
||||||
}, WithWorkers(1))
|
}, WithContext(ctx))
|
||||||
assert.NotNil(t, err)
|
|
||||||
assert.Equal(t, message, err.Error())
|
|
||||||
assert.True(t, done.True())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithContext(t *testing.T) {
|
func TestMapReduceWithContext(t *testing.T) {
|
||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var done syncx.AtomicBool
|
var done int32
|
||||||
var result []int
|
var result []int
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, c func(error)) {
|
}, func(item interface{}, writer Writer, c func(error)) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
if i == defaultWorkers/2 {
|
if i == defaultWorkers/2 {
|
||||||
|
|||||||
30
core/prof/runtime.go
Normal file
30
core/prof/runtime.go
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
package prof
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultInterval = time.Second * 5
|
||||||
|
mega = 1024 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
func DisplayStats(interval ...time.Duration) {
|
||||||
|
duration := defaultInterval
|
||||||
|
for _, val := range interval {
|
||||||
|
duration = val
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(duration)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
fmt.Printf("Goroutines: %d, Alloc: %vm, TotalAlloc: %vm, Sys: %vm, NumGC: %v\n",
|
||||||
|
runtime.NumGoroutine(), m.Alloc/mega, m.TotalAlloc/mega, m.Sys/mega, m.NumGC)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
@@ -11,6 +11,8 @@
|
|||||||
[](https://github.com/zeromicro/go-zero)
|
[](https://github.com/zeromicro/go-zero)
|
||||||
[](https://opensource.org/licenses/MIT)
|
[](https://opensource.org/licenses/MIT)
|
||||||
|
|
||||||
|
> ***缩短从需求到上线的距离***
|
||||||
|
|
||||||
**注意:为了满足开源基金会要求,go-zero 从好未来(tal-tech)组织下迁移至中立的 GitHub 组织(zeromicro)。**
|
**注意:为了满足开源基金会要求,go-zero 从好未来(tal-tech)组织下迁移至中立的 GitHub 组织(zeromicro)。**
|
||||||
|
|
||||||
## 0. go-zero 介绍
|
## 0. go-zero 介绍
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// BuildVersion is the version of goctl.
|
// BuildVersion is the version of goctl.
|
||||||
const BuildVersion = "1.3.0"
|
const BuildVersion = "1.3.0-20220201"
|
||||||
|
|
||||||
var tag = map[string]int{"pre-alpha": 0, "alpha": 1, "pre-bata": 2, "beta": 3, "released": 4, "": 5}
|
var tag = map[string]int{"pre-alpha": 0, "alpha": 1, "pre-bata": 2, "beta": 3, "released": 4, "": 5}
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,16 @@ import (
|
|||||||
|
|
||||||
const zeromicroVersion = "1.3.0"
|
const zeromicroVersion = "1.3.0"
|
||||||
|
|
||||||
var fset = token.NewFileSet()
|
const (
|
||||||
|
confirmUnknown = iota
|
||||||
|
confirmAll
|
||||||
|
confirmIgnore
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
fset = token.NewFileSet()
|
||||||
|
builderxConfirm = confirmUnknown
|
||||||
|
)
|
||||||
|
|
||||||
func Migrate(c *cli.Context) error {
|
func Migrate(c *cli.Context) error {
|
||||||
verbose := c.Bool("verbose")
|
verbose := c.Bool("verbose")
|
||||||
@@ -133,7 +142,7 @@ func rewriteFile(pkgs map[string]*ast.Package, verbose bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if verbose {
|
if verbose {
|
||||||
console.Debug("[...] migrate %q ... ", filepath.Base(filename))
|
console.Debug("[...] migrating %q ... ", filepath.Base(filename))
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.Contains(imp.Path.Value, deprecatedBuilderx) {
|
if strings.Contains(imp.Path.Value, deprecatedBuilderx) {
|
||||||
@@ -179,7 +188,7 @@ func writeFile(pkgs []*ast.Package, verbose bool) error {
|
|||||||
return fmt.Errorf("[rewriteImport] write file %s error: %+v", filename, err)
|
return fmt.Errorf("[rewriteImport] write file %s error: %+v", filename, err)
|
||||||
}
|
}
|
||||||
if verbose {
|
if verbose {
|
||||||
console.Success("[OK] migrate %q success ", filepath.Base(filename))
|
console.Success("[OK] migrated %q successfully", filepath.Base(filename))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -239,11 +248,21 @@ func replacePkg(file *ast.File) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func refactorBuilderx(deprecated, replacement string, fn func(allow bool)) {
|
func refactorBuilderx(deprecated, replacement string, fn func(allow bool)) {
|
||||||
|
switch builderxConfirm {
|
||||||
|
case confirmAll:
|
||||||
|
fn(true)
|
||||||
|
return
|
||||||
|
case confirmIgnore:
|
||||||
|
fn(false)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
msg := fmt.Sprintf(`Detects a deprecated package in the source code,
|
msg := fmt.Sprintf(`Detects a deprecated package in the source code,
|
||||||
Deprecated package: %q
|
Deprecated package: %q
|
||||||
Replacement package: %q
|
Replacement package: %q
|
||||||
It's recommended to use the replacement package, do you want to replace?
|
It's recommended to use the replacement package, do you want to replace?
|
||||||
[input 'Y' for yes, 'N' for no]: `, deprecated, replacement)
|
['Y' for yes, 'N' for no, 'A' for all, 'I' for ignore]: `,
|
||||||
|
deprecated, replacement)
|
||||||
|
|
||||||
if runtime.GOOS != vars.OsWindows {
|
if runtime.GOOS != vars.OsWindows {
|
||||||
msg = aurora.Yellow(msg).String()
|
msg = aurora.Yellow(msg).String()
|
||||||
@@ -253,21 +272,23 @@ It's recommended to use the replacement package, do you want to replace?
|
|||||||
for {
|
for {
|
||||||
var in string
|
var in string
|
||||||
fmt.Scanln(&in)
|
fmt.Scanln(&in)
|
||||||
if len(in) == 0 {
|
switch {
|
||||||
console.Warning("nothing input, please try again [input 'Y' for yes, 'N' for no]:")
|
case strings.EqualFold(in, "Y"):
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if strings.EqualFold(in, "Y") {
|
|
||||||
fn(true)
|
fn(true)
|
||||||
return
|
return
|
||||||
}
|
case strings.EqualFold(in, "N"):
|
||||||
|
|
||||||
if strings.EqualFold(in, "N") {
|
|
||||||
fn(false)
|
fn(false)
|
||||||
return
|
return
|
||||||
|
case strings.EqualFold(in, "A"):
|
||||||
|
fn(true)
|
||||||
|
builderxConfirm = confirmAll
|
||||||
|
return
|
||||||
|
case strings.EqualFold(in, "I"):
|
||||||
|
fn(false)
|
||||||
|
builderxConfirm = confirmIgnore
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
console.Warning("['Y' for yes, 'N' for no, 'A' for all, 'I' for ignore]: ")
|
||||||
}
|
}
|
||||||
|
|
||||||
console.Warning("invalid input, please try again [input 'Y' for yes, 'N' for no]:")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user