diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index e7763f0e..23d70bd0 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -102,12 +102,12 @@ func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) { options := buildOptions(opts...) panicChan := &onceChan{channel: make(chan interface{})} source := buildSource(generate, panicChan) - collector := make(chan interface{}, options.workers) + collector := make(chan interface{}) done := make(chan lang.PlaceholderType) go executeMappers(mapperContext{ ctx: options.ctx, - mapper: func(item interface{}, writer Writer) { + mapper: func(item interface{}, _ Writer) { mapper(item) }, source: source,