diff --git a/core/fx/stream.go b/core/fx/stream.go index c627e407..2f4056b1 100644 --- a/core/fx/stream.go +++ b/core/fx/stream.go @@ -90,7 +90,8 @@ func Range(source <-chan interface{}) Stream { func (s Stream) AllMach(predicate func(item interface{}) bool) bool { for item := range s.source { if !predicate(item) { - drain(s.source) + // make sure the former goroutine not block, and current func returns fast. + go drain(s.source) return false } } @@ -104,7 +105,8 @@ func (s Stream) AllMach(predicate func(item interface{}) bool) bool { func (s Stream) AnyMach(predicate func(item interface{}) bool) bool { for item := range s.source { if predicate(item) { - drain(s.source) + // make sure the former goroutine not block, and current func returns fast. + go drain(s.source) return true } } @@ -215,7 +217,7 @@ func (s Stream) First() interface{} { func (s Stream) ForAll(fn ForAllFunc) { fn(s.source) // avoid goroutine leak on fn not consuming all items. - drain(s.source) + go drain(s.source) } // ForEach seals the Stream with the ForEachFunc on each item, no successive operations. @@ -310,7 +312,8 @@ func (s Stream) Merge() Stream { func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool { for item := range s.source { if predicate(item) { - drain(s.source) + // make sure the former goroutine not block, and current func returns fast. + go drain(s.source) return false } } diff --git a/core/fx/stream_test.go b/core/fx/stream_test.go index 92835993..3f5bbe5b 100644 --- a/core/fx/stream_test.go +++ b/core/fx/stream_test.go @@ -567,5 +567,5 @@ func runCheckedTest(t *testing.T, fn func(t *testing.T)) { fn(t) // let scheduler schedule first time.Sleep(time.Millisecond) - assert.Equal(t, goroutines, runtime.NumGoroutine()) + assert.True(t, runtime.NumGoroutine() <= goroutines) }