Compare commits

...

6 Commits

Author SHA1 Message Date
Kevin Wan
7727d70634 chore: update goctl version (#1497) 2022-02-01 09:50:26 +08:00
Kevin Wan
5f9d101bc6 feat: add runtime stats monitor (#1496) 2022-02-01 01:34:25 +08:00
Kevin Wan
6c2abe7474 fix: goroutine stuck on edge case (#1495)
* fix: goroutine stuck on edge case

* refactor: simplify mapreduce implementation
2022-01-30 13:09:21 +08:00
Kevin Wan
14a902c1a7 feat: handling panic in mapreduce, panic in calling goroutine, not inside goroutines (#1490)
* feat: handle panic

* chore: update fuzz test

* chore: optimize square sum algorithm
2022-01-28 10:59:41 +08:00
Kevin Wan
5ad6a6d229 Update readme-cn.md
add slogan
2022-01-27 17:16:30 +08:00
Kevin Wan
6f4b97864a chore: improve migrate confirmation (#1488) 2022-01-27 11:30:35 +08:00
9 changed files with 545 additions and 190 deletions

3
.gitignore vendored
View File

@@ -16,7 +16,8 @@
**/logs **/logs
# for test purpose # for test purpose
adhoc **/adhoc
**/testdata
# gitlab ci # gitlab ci
.cache .cache

View File

@@ -3,12 +3,11 @@ package mr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"sync" "sync"
"sync/atomic"
"github.com/zeromicro/go-zero/core/errorx" "github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading"
) )
const ( const (
@@ -42,6 +41,16 @@ type (
// Option defines the method to customize the mapreduce. // Option defines the method to customize the mapreduce.
Option func(opts *mapReduceOptions) Option func(opts *mapReduceOptions)
mapperContext struct {
ctx context.Context
mapper MapFunc
source <-chan interface{}
panicChan *onceChan
collector chan<- interface{}
doneChan <-chan lang.PlaceholderType
workers int
}
mapReduceOptions struct { mapReduceOptions struct {
ctx context.Context ctx context.Context
workers int workers int
@@ -90,46 +99,72 @@ func FinishVoid(fns ...func()) {
// ForEach maps all elements from given generate but no output. // ForEach maps all elements from given generate but no output.
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) { func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
drain(Map(generate, func(item interface{}, writer Writer) {
mapper(item)
}, opts...))
}
// Map maps all elements generated from given generate func, and returns an output channel.
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
options := buildOptions(opts...) options := buildOptions(opts...)
source := buildSource(generate) panicChan := &onceChan{channel: make(chan interface{})}
source := buildSource(generate, panicChan)
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
done := make(chan lang.PlaceholderType) done := make(chan lang.PlaceholderType)
go executeMappers(options.ctx, mapper, source, collector, done, options.workers) go executeMappers(mapperContext{
ctx: options.ctx,
mapper: func(item interface{}, writer Writer) {
mapper(item)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
return collector for {
select {
case v := <-panicChan.channel:
panic(v)
case _, ok := <-collector:
if !ok {
return
}
}
}
} }
// MapReduce maps all elements generated from given generate func, // MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer. // and reduces the output elements with given reducer.
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) { opts ...Option) (interface{}, error) {
source := buildSource(generate) panicChan := &onceChan{channel: make(chan interface{})}
return MapReduceChan(source, mapper, reducer, opts...) source := buildSource(generate, panicChan)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
} }
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer. // MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) { opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
reducer ReducerFunc, opts ...Option) (interface{}, error) {
options := buildOptions(opts...) options := buildOptions(opts...)
// output is used to write the final result
output := make(chan interface{}) output := make(chan interface{})
defer func() { defer func() {
// reducer can only write once, if more, panic
for range output { for range output {
panic("more than one element written in reducer") panic("more than one element written in reducer")
} }
}() }()
// collector is used to collect data from mapper, and consume in reducer
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
// if done is closed, all mappers and reducer should stop processing
done := make(chan lang.PlaceholderType) done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done) writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once var closeOnce sync.Once
// use atomic.Value to avoid data race
var retErr errorx.AtomicError var retErr errorx.AtomicError
finish := func() { finish := func() {
closeOnce.Do(func() { closeOnce.Do(func() {
@@ -151,30 +186,38 @@ func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer Reducer
go func() { go func() {
defer func() { defer func() {
drain(collector) drain(collector)
if r := recover(); r != nil { if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r)) panicChan.write(r)
} else {
finish()
} }
finish()
}() }()
reducer(collector, writer, cancel) reducer(collector, writer, cancel)
}() }()
go executeMappers(options.ctx, func(item interface{}, w Writer) { go executeMappers(mapperContext{
mapper(item, w, cancel) ctx: options.ctx,
}, source, collector, done, options.workers) mapper: func(item interface{}, w Writer) {
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
select { select {
case <-options.ctx.Done(): case <-options.ctx.Done():
cancel(context.DeadlineExceeded) cancel(context.DeadlineExceeded)
return nil, context.DeadlineExceeded return nil, context.DeadlineExceeded
case value, ok := <-output: case v := <-panicChan.channel:
panic(v)
case v, ok := <-output:
if err := retErr.Load(); err != nil { if err := retErr.Load(); err != nil {
return nil, err return nil, err
} else if ok { } else if ok {
return value, nil return v, nil
} else { } else {
return nil, ErrReduceNoOutput return nil, ErrReduceNoOutput
} }
@@ -221,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
return options return options
} }
func buildSource(generate GenerateFunc) chan interface{} { func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
source := make(chan interface{}) source := make(chan interface{})
threading.GoSafe(func() { go func() {
defer close(source) defer func() {
if r := recover(); r != nil {
panicChan.write(r)
}
close(source)
}()
generate(source) generate(source)
}) }()
return source return source
} }
@@ -238,39 +287,43 @@ func drain(channel <-chan interface{}) {
} }
} }
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{}, func executeMappers(mCtx mapperContext) {
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer func() { defer func() {
wg.Wait() wg.Wait()
close(collector) close(mCtx.collector)
drain(mCtx.source)
}() }()
pool := make(chan lang.PlaceholderType, workers) var failed int32
writer := newGuardedWriter(ctx, collector, done) pool := make(chan lang.PlaceholderType, mCtx.workers)
for { writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
for atomic.LoadInt32(&failed) == 0 {
select { select {
case <-ctx.Done(): case <-mCtx.ctx.Done():
return return
case <-done: case <-mCtx.doneChan:
return return
case pool <- lang.Placeholder: case pool <- lang.Placeholder:
item, ok := <-input item, ok := <-mCtx.source
if !ok { if !ok {
<-pool <-pool
return return
} }
wg.Add(1) wg.Add(1)
// better to safely run caller defined method go func() {
threading.GoSafe(func() {
defer func() { defer func() {
if r := recover(); r != nil {
atomic.AddInt32(&failed, 1)
mCtx.panicChan.write(r)
}
wg.Done() wg.Done()
<-pool <-pool
}() }()
mapper(item, writer) mCtx.mapper(item, writer)
}) }()
} }
} }
} }
@@ -316,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
gw.channel <- v gw.channel <- v
} }
} }
type onceChan struct {
channel chan interface{}
wrote int32
}
func (oc *onceChan) write(val interface{}) {
if atomic.AddInt32(&oc.wrote, 1) > 1 {
return
}
oc.channel <- val
}

View File

@@ -0,0 +1,78 @@
//go:build go1.18
// +build go1.18
package mr
import (
"fmt"
"math/rand"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func FuzzMapReduce(f *testing.F) {
rand.Seed(time.Now().UnixNano())
f.Add(uint(10), uint(runtime.NumCPU()))
f.Fuzz(func(t *testing.T, num uint, workers uint) {
n := int64(num)%5000 + 5000
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
})
}

View File

@@ -0,0 +1,107 @@
//go:build fuzz
// +build fuzz
package mr
import (
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
// so we need to simulate the fuzz test in test mode.
func TestMapReduceRandom(t *testing.T) {
rand.Seed(time.Now().UnixNano())
const (
times = 10000
nRange = 500
mega = 1024 * 1024
)
bar := pb.New(times).Start()
runner := threading.NewTaskRunner(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
runner.Schedule(func() {
start := time.Now()
defer func() {
if time.Since(start) > time.Minute {
t.Fatal("timeout")
}
wg.Done()
}()
t.Run(strconv.Itoa(i), func(t *testing.T) {
n := rand.Int63n(nRange)%nRange + nRange
workers := rand.Int()%50 + runtime.NumCPU()/2
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
bar.Increment()
})
})
}
wg.Wait()
bar.Finish()
}

View File

@@ -11,8 +11,6 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/syncx"
"go.uber.org/goleak" "go.uber.org/goleak"
) )
@@ -124,84 +122,69 @@ func TestForEach(t *testing.T) {
t.Run("all", func(t *testing.T) { t.Run("all", func(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
ForEach(func(source chan<- interface{}) { assert.PanicsWithValue(t, "foo", func() {
for i := 0; i < tasks; i++ { ForEach(func(source chan<- interface{}) {
source <- i for i := 0; i < tasks; i++ {
} source <- i
}, func(item interface{}) { }
panic("foo") }, func(item interface{}) {
panic("foo")
})
}) })
}) })
} }
func TestMap(t *testing.T) { func TestGeneratePanic(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
tests := []struct { t.Run("all", func(t *testing.T) {
mapper MapFunc assert.PanicsWithValue(t, "foo", func() {
expect int ForEach(func(source chan<- interface{}) {
}{ panic("foo")
{ }, func(item interface{}) {
mapper: func(item interface{}, writer Writer) { })
v := item.(int) })
writer.Write(v * v) })
}, }
expect: 30,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
return
}
writer.Write(v * v)
},
expect: 10,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
panic(v)
}
writer.Write(v * v)
},
expect: 10,
},
}
for _, test := range tests { func TestMapperPanic(t *testing.T) {
t.Run(stringx.Rand(), func(t *testing.T) { defer goleak.VerifyNone(t)
channel := Map(func(source chan<- interface{}) {
for i := 1; i < 5; i++ { const tasks = 1000
var run int32
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i source <- i
} }
}, test.mapper, WithWorkers(-1)) }, func(item interface{}, writer Writer, cancel func(error)) {
atomic.AddInt32(&run, 1)
var result int panic("foo")
for v := range channel { }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
result += v.(int) })
}
assert.Equal(t, test.expect, result)
}) })
} assert.True(t, atomic.LoadInt32(&run) < tasks/2)
})
} }
func TestMapReduce(t *testing.T) { func TestMapReduce(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
tests := []struct { tests := []struct {
name string
mapper MapperFunc mapper MapperFunc
reducer ReducerFunc reducer ReducerFunc
expectErr error expectErr error
expectValue interface{} expectValue interface{}
}{ }{
{ {
name: "simple",
expectErr: nil, expectErr: nil,
expectValue: 30, expectValue: 30,
}, },
{ {
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -212,6 +195,7 @@ func TestMapReduce(t *testing.T) {
expectErr: errDummy, expectErr: errDummy,
}, },
{ {
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -223,6 +207,7 @@ func TestMapReduce(t *testing.T) {
expectValue: nil, expectValue: nil,
}, },
{ {
name: "cancel with more",
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) { reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int var result int
for item := range pipe { for item := range pipe {
@@ -237,45 +222,68 @@ func TestMapReduce(t *testing.T) {
}, },
} }
for _, test := range tests { t.Run("MapReduce", func(t *testing.T) {
t.Run(stringx.Rand(), func(t *testing.T) { for _, test := range tests {
if test.mapper == nil { t.Run(test.name, func(t *testing.T) {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) { if test.mapper == nil {
v := item.(int) test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
writer.Write(v * v) v := item.(int)
} writer.Write(v * v)
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
} }
writer.Write(result)
} }
} if test.reducer == nil {
value, err := MapReduce(func(source chan<- interface{}) { test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for i := 1; i < 5; i++ { var result int
source <- i for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
} }
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU())) value, err := MapReduce(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
assert.Equal(t, test.expectErr, err) assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value) assert.Equal(t, test.expectValue, value)
}) })
} }
} })
func TestMapReducePanicBothMapperAndReducer(t *testing.T) { t.Run("MapReduce", func(t *testing.T) {
defer goleak.VerifyNone(t) for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
}
_, _ = MapReduce(func(source chan<- interface{}) { source := make(chan interface{})
source <- 0 go func() {
source <- 1 for i := 1; i < 5; i++ {
}, func(item interface{}, writer Writer, cancel func(error)) { source <- i
panic("foo") }
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) { close(source)
panic("bar") }()
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
}) })
} }
@@ -302,16 +310,19 @@ func TestMapReduceVoid(t *testing.T) {
var value uint32 var value uint32
tests := []struct { tests := []struct {
name string
mapper MapperFunc mapper MapperFunc
reducer VoidReducerFunc reducer VoidReducerFunc
expectValue uint32 expectValue uint32
expectErr error expectErr error
}{ }{
{ {
name: "simple",
expectValue: 30, expectValue: 30,
expectErr: nil, expectErr: nil,
}, },
{ {
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -322,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: errDummy, expectErr: errDummy,
}, },
{ {
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -332,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: ErrCancelWithNil, expectErr: ErrCancelWithNil,
}, },
{ {
name: "cancel with more",
reducer: func(pipe <-chan interface{}, cancel func(error)) { reducer: func(pipe <-chan interface{}, cancel func(error)) {
for item := range pipe { for item := range pipe {
result := atomic.AddUint32(&value, uint32(item.(int))) result := atomic.AddUint32(&value, uint32(item.(int)))
@@ -345,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
atomic.StoreUint32(&value, 0) atomic.StoreUint32(&value, 0)
if test.mapper == nil { if test.mapper == nil {
@@ -400,39 +413,59 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
assert.Equal(t, 0, result[1]) assert.Equal(t, 0, result[1])
} }
func TestMapVoid(t *testing.T) {
defer goleak.VerifyNone(t)
const tasks = 1000
var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
})
assert.Equal(t, tasks, int(count))
}
func TestMapReducePanic(t *testing.T) { func TestMapReducePanic(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
v, err := MapReduce(func(source chan<- interface{}) { assert.Panics(t, func() {
source <- 0 _, _ = MapReduce(func(source chan<- interface{}) {
source <- 1 source <- 0
}, func(item interface{}, writer Writer, cancel func(error)) { source <- 1
i := item.(int) }, func(item interface{}, writer Writer, cancel func(error)) {
writer.Write(i) i := item.(int)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) { writer.Write(i)
for range pipe { }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
panic("panic") for range pipe {
} panic("panic")
}
})
})
}
func TestMapReducePanicOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < 100; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
if i == 0 {
panic("foo")
}
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("bar")
}
})
})
}
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
panic("bar")
})
}) })
assert.Nil(t, v)
assert.NotNil(t, err)
assert.Equal(t, "panic", err.Error())
} }
func TestMapReduceVoidCancel(t *testing.T) { func TestMapReduceVoidCancel(t *testing.T) {
@@ -461,13 +494,13 @@ func TestMapReduceVoidCancel(t *testing.T) {
func TestMapReduceVoidCancelWithRemains(t *testing.T) { func TestMapReduceVoidCancelWithRemains(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
var done syncx.AtomicBool var done int32
var result []int var result []int
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
done.Set(true) atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) { }, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int) i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
@@ -482,7 +515,7 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
}) })
assert.NotNil(t, err) assert.NotNil(t, err)
assert.Equal(t, "anything", err.Error()) assert.Equal(t, "anything", err.Error())
assert.True(t, done.True()) assert.Equal(t, int32(1), done)
} }
func TestMapReduceWithoutReducerWrite(t *testing.T) { func TestMapReduceWithoutReducerWrite(t *testing.T) {
@@ -507,34 +540,51 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
const message = "foo" const message = "foo"
var done syncx.AtomicBool assert.Panics(t, func() {
err := MapReduceVoid(func(source chan<- interface{}) { var done int32
_ = MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, cancel func(error)) {
panic(message)
}, WithWorkers(1))
})
}
func TestForEachWithContext(t *testing.T) {
defer goleak.VerifyNone(t)
var done int32
ctx, cancel := context.WithCancel(context.Background())
ForEach(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
done.Set(true) atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) { }, func(item interface{}) {
i := item.(int) i := item.(int)
writer.Write(i) if i == defaultWorkers/2 {
}, func(pipe <-chan interface{}, cancel func(error)) { cancel()
panic(message) }
}, WithWorkers(1)) }, WithContext(ctx))
assert.NotNil(t, err)
assert.Equal(t, message, err.Error())
assert.True(t, done.True())
} }
func TestMapReduceWithContext(t *testing.T) { func TestMapReduceWithContext(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
var done syncx.AtomicBool var done int32
var result []int var result []int
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
done.Set(true) atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, c func(error)) { }, func(item interface{}, writer Writer, c func(error)) {
i := item.(int) i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {

30
core/prof/runtime.go Normal file
View File

@@ -0,0 +1,30 @@
package prof
import (
"fmt"
"runtime"
"time"
)
const (
defaultInterval = time.Second * 5
mega = 1024 * 1024
)
func DisplayStats(interval ...time.Duration) {
duration := defaultInterval
for _, val := range interval {
duration = val
}
go func() {
ticker := time.NewTicker(duration)
defer ticker.Stop()
for range ticker.C {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Goroutines: %d, Alloc: %vm, TotalAlloc: %vm, Sys: %vm, NumGC: %v\n",
runtime.NumGoroutine(), m.Alloc/mega, m.TotalAlloc/mega, m.Sys/mega, m.NumGC)
}
}()
}

View File

@@ -11,6 +11,8 @@
[![Release](https://img.shields.io/github/v/release/zeromicro/go-zero.svg?style=flat-square)](https://github.com/zeromicro/go-zero) [![Release](https://img.shields.io/github/v/release/zeromicro/go-zero.svg?style=flat-square)](https://github.com/zeromicro/go-zero)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
> ***缩短从需求到上线的距离***
**注意为了满足开源基金会要求go-zero 从好未来tal-tech组织下迁移至中立的 GitHub 组织zeromicro。** **注意为了满足开源基金会要求go-zero 从好未来tal-tech组织下迁移至中立的 GitHub 组织zeromicro。**
## 0. go-zero 介绍 ## 0. go-zero 介绍

View File

@@ -6,7 +6,7 @@ import (
) )
// BuildVersion is the version of goctl. // BuildVersion is the version of goctl.
const BuildVersion = "1.3.0" const BuildVersion = "1.3.0-20220201"
var tag = map[string]int{"pre-alpha": 0, "alpha": 1, "pre-bata": 2, "beta": 3, "released": 4, "": 5} var tag = map[string]int{"pre-alpha": 0, "alpha": 1, "pre-bata": 2, "beta": 3, "released": 4, "": 5}

View File

@@ -27,7 +27,16 @@ import (
const zeromicroVersion = "1.3.0" const zeromicroVersion = "1.3.0"
var fset = token.NewFileSet() const (
confirmUnknown = iota
confirmAll
confirmIgnore
)
var (
fset = token.NewFileSet()
builderxConfirm = confirmUnknown
)
func Migrate(c *cli.Context) error { func Migrate(c *cli.Context) error {
verbose := c.Bool("verbose") verbose := c.Bool("verbose")
@@ -133,7 +142,7 @@ func rewriteFile(pkgs map[string]*ast.Package, verbose bool) error {
} }
if verbose { if verbose {
console.Debug("[...] migrate %q ... ", filepath.Base(filename)) console.Debug("[...] migrating %q ... ", filepath.Base(filename))
} }
if strings.Contains(imp.Path.Value, deprecatedBuilderx) { if strings.Contains(imp.Path.Value, deprecatedBuilderx) {
@@ -179,7 +188,7 @@ func writeFile(pkgs []*ast.Package, verbose bool) error {
return fmt.Errorf("[rewriteImport] write file %s error: %+v", filename, err) return fmt.Errorf("[rewriteImport] write file %s error: %+v", filename, err)
} }
if verbose { if verbose {
console.Success("[OK] migrate %q success ", filepath.Base(filename)) console.Success("[OK] migrated %q successfully", filepath.Base(filename))
} }
} }
} }
@@ -239,11 +248,21 @@ func replacePkg(file *ast.File) {
} }
func refactorBuilderx(deprecated, replacement string, fn func(allow bool)) { func refactorBuilderx(deprecated, replacement string, fn func(allow bool)) {
switch builderxConfirm {
case confirmAll:
fn(true)
return
case confirmIgnore:
fn(false)
return
}
msg := fmt.Sprintf(`Detects a deprecated package in the source code, msg := fmt.Sprintf(`Detects a deprecated package in the source code,
Deprecated package: %q Deprecated package: %q
Replacement package: %q Replacement package: %q
It's recommended to use the replacement package, do you want to replace? It's recommended to use the replacement package, do you want to replace?
[input 'Y' for yes, 'N' for no]: `, deprecated, replacement) ['Y' for yes, 'N' for no, 'A' for all, 'I' for ignore]: `,
deprecated, replacement)
if runtime.GOOS != vars.OsWindows { if runtime.GOOS != vars.OsWindows {
msg = aurora.Yellow(msg).String() msg = aurora.Yellow(msg).String()
@@ -253,21 +272,23 @@ It's recommended to use the replacement package, do you want to replace?
for { for {
var in string var in string
fmt.Scanln(&in) fmt.Scanln(&in)
if len(in) == 0 { switch {
console.Warning("nothing input, please try again [input 'Y' for yes, 'N' for no]:") case strings.EqualFold(in, "Y"):
continue
}
if strings.EqualFold(in, "Y") {
fn(true) fn(true)
return return
} case strings.EqualFold(in, "N"):
if strings.EqualFold(in, "N") {
fn(false) fn(false)
return return
case strings.EqualFold(in, "A"):
fn(true)
builderxConfirm = confirmAll
return
case strings.EqualFold(in, "I"):
fn(false)
builderxConfirm = confirmIgnore
return
default:
console.Warning("['Y' for yes, 'N' for no, 'A' for all, 'I' for ignore]: ")
} }
console.Warning("invalid input, please try again [input 'Y' for yes, 'N' for no]:")
} }
} }