Compare commits

..

1 Commits

Author SHA1 Message Date
spectatorMrZ
304fb182bb Fix pg model generation without tag (#1407)
1. fix pg model struct haven't tag
2. add pg command test from datasource
2022-01-08 21:48:09 +08:00
498 changed files with 2548 additions and 7505 deletions

View File

@@ -7,50 +7,32 @@ on:
branches: [ master ] branches: [ master ]
jobs: jobs:
test-linux: build:
name: Linux name: Build
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.15
id: go
- name: Check out code into the Go module directory - name: Set up Go 1.x
uses: actions/checkout@v2 uses: actions/setup-go@v2
with:
go-version: ^1.14
id: go
- name: Get dependencies - name: Check out code into the Go module directory
run: | uses: actions/checkout@v2
go get -v -t -d ./...
- name: Lint - name: Get dependencies
run: | run: |
go vet -stdmethods=false $(go list ./...) go get -v -t -d ./...
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
- name: Test - name: Lint
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./... run: |
go vet -stdmethods=false $(go list ./...)
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
- name: Codecov - name: Test
uses: codecov/codecov-action@v2 run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
test-win: - name: Codecov
name: Windows uses: codecov/codecov-action@v2
runs-on: windows-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.15
- name: Checkout codebase
uses: actions/checkout@v2
- name: Test
run: |
go mod verify
go mod download
go test -v -race ./...
cd tools/goctl && go build -v goctl.go

View File

@@ -1,18 +0,0 @@
name: 'issue-translator'
on:
issue_comment:
types: [created]
issues:
types: [opened]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: tomsun28/issues-translate-action@v2.6
with:
IS_MODIFY_TITLE: true
# not require, default false, . Decide whether to modify the issue title
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑‍🤝‍🧑👫🧑🏿‍🤝‍🧑🏻👩🏾‍🤝‍👨🏿👬🏿
# not require. Customize the translation robot prefix message.

3
.gitignore vendored
View File

@@ -16,8 +16,7 @@
**/logs **/logs
# for test purpose # for test purpose
**/adhoc adhoc
**/testdata
# gitlab ci # gitlab ci
.cache .cache

View File

@@ -40,7 +40,7 @@ We will help you to contribute in different areas like filing issues, developing
getting your work reviewed and merged. getting your work reviewed and merged.
If you have questions about the development process, If you have questions about the development process,
feel free to [file an issue](https://github.com/zeromicro/go-zero/issues/new/choose). feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose).
## Find something to work on ## Find something to work on
@@ -50,10 +50,10 @@ Here is how you get started.
### Find a good first topic ### Find a good first topic
[go-zero](https://github.com/zeromicro/go-zero) has beginner-friendly issues that provide a good first issue. [go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue.
For example, [go-zero](https://github.com/zeromicro/go-zero) has For example, [go-zero](https://github.com/tal-tech/go-zero) has
[help wanted](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and [help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
[good first issue](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22) [good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
labels for issues that should not need deep knowledge of the system. labels for issues that should not need deep knowledge of the system.
We can help new contributors who wish to work on such issues. We can help new contributors who wish to work on such issues.
@@ -79,7 +79,7 @@ This is a rough outline of what a contributor's workflow looks like:
- Create a topic branch from where to base the contribution. This is usually master. - Create a topic branch from where to base the contribution. This is usually master.
- Make commits of logical units. - Make commits of logical units.
- Push changes in a topic branch to a personal fork of the repository. - Push changes in a topic branch to a personal fork of the repository.
- Submit a pull request to [go-zero](https://github.com/zeromicro/go-zero). - Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero).
## Creating Pull Requests ## Creating Pull Requests

View File

@@ -20,9 +20,9 @@ We hope that the items listed below will inspire further engagement from the com
- [x] Support `goctl bug` to report bugs conveniently - [x] Support `goctl bug` to report bugs conveniently
## 2022 ## 2022
- [x] Support `context` in redis related methods for timeout and tracing - [ ] Support `goctl mock` command to start a mocking server with given `.api` file
- [x] Support `context` in sql related methods for timeout and tracing
- [ ] Support `context` in mongodb related methods for timeout and tracing
- [ ] Add `httpx.Client` with governance, like circuit breaker etc. - [ ] Add `httpx.Client` with governance, like circuit breaker etc.
- [ ] Support `goctl doctor` command to report potential issues for given service - [ ] Support `goctl doctor` command to report potential issues for given service
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file - [ ] Support `context` in redis related methods for timeout and tracing
- [ ] Support `context` in sql related methods for timeout and tracing
- [ ] Support `context` in mongodb related methods for timeout and tracing

View File

@@ -4,8 +4,8 @@ import (
"errors" "errors"
"strconv" "strconv"
"github.com/zeromicro/go-zero/core/hash" "github.com/tal-tech/go-zero/core/hash"
"github.com/zeromicro/go-zero/core/stores/redis" "github.com/tal-tech/go-zero/core/stores/redis"
) )
const ( const (

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stores/redis/redistest" "github.com/tal-tech/go-zero/core/stores/redis/redistest"
) )
func TestRedisBitSet_New_Set_Test(t *testing.T) { func TestRedisBitSet_New_Set_Test(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/zeromicro/go-zero/core/mathx" "github.com/tal-tech/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/proc" "github.com/tal-tech/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const ( const (

View File

@@ -8,7 +8,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
) )
func init() { func init() {

View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
) )
func init() { func init() {

View File

@@ -4,8 +4,8 @@ import (
"math" "math"
"time" "time"
"github.com/zeromicro/go-zero/core/collection" "github.com/tal-tech/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx" "github.com/tal-tech/go-zero/core/mathx"
) )
const ( const (

View File

@@ -7,9 +7,9 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/collection" "github.com/tal-tech/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx" "github.com/tal-tech/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
) )
const ( const (

View File

@@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/iox" "github.com/tal-tech/go-zero/core/iox"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
) )
func TestEnterToContinue(t *testing.T) { func TestEnterToContinue(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"encoding/base64" "encoding/base64"
"errors" "errors"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
) )
// ErrPaddingSize indicates bad padding size. // ErrPaddingSize indicates bad padding size.

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
) )
const ( const (

View File

@@ -6,9 +6,9 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx" "github.com/tal-tech/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
) )
const ( const (

View File

@@ -61,41 +61,3 @@ func TestPutMore(t *testing.T) {
assert.Equal(t, string(element), string(body.([]byte))) assert.Equal(t, string(element), string(body.([]byte)))
} }
} }
func TestPutMoreWithHeaderNotZero(t *testing.T) {
elements := [][]byte{
[]byte("hello"),
[]byte("world"),
[]byte("again"),
}
queue := NewQueue(4)
for i := range elements {
queue.Put(elements[i])
}
// take 1
body, ok := queue.Take()
assert.True(t, ok)
element, ok := body.([]byte)
assert.True(t, ok)
assert.Equal(t, element, []byte("hello"))
// put more
queue.Put([]byte("b4"))
queue.Put([]byte("b5")) // will store in elements[0]
queue.Put([]byte("b6")) // cause expansion
results := [][]byte{
[]byte("world"),
[]byte("again"),
[]byte("b4"),
[]byte("b5"),
[]byte("b6"),
}
for _, element := range results {
body, ok := queue.Take()
assert.True(t, ok)
assert.Equal(t, string(element), string(body.([]byte)))
}
}

View File

@@ -4,7 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
type ( type (

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
const duration = time.Millisecond * 50 const duration = time.Millisecond * 50

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
func TestSafeMap(t *testing.T) { func TestSafeMap(t *testing.T) {

View File

@@ -1,8 +1,8 @@
package collection package collection
import ( import (
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
) )
const ( const (

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
) )
func init() { func init() {

View File

@@ -5,9 +5,9 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const drainWorkers = 8 const drainWorkers = 8

View File

@@ -8,10 +8,10 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const ( const (

View File

@@ -7,7 +7,7 @@ import (
"os" "os"
"path" "path"
"github.com/zeromicro/go-zero/core/mapping" "github.com/tal-tech/go-zero/core/mapping"
) )
var loaders = map[string]func([]byte, interface{}) error{ var loaders = map[string]func([]byte, interface{}) error{

View File

@@ -6,8 +6,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/hash" "github.com/tal-tech/go-zero/core/hash"
) )
func TestLoadConfig_notExists(t *testing.T) { func TestLoadConfig_notExists(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/zeromicro/go-zero/core/iox" "github.com/tal-tech/go-zero/core/iox"
) )
// PropertyError represents a configuration error message. // PropertyError represents a configuration error message.

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
) )
func TestProperties(t *testing.T) { func TestProperties(t *testing.T) {

View File

@@ -3,7 +3,7 @@ package contextx
import ( import (
"context" "context"
"github.com/zeromicro/go-zero/core/mapping" "github.com/tal-tech/go-zero/core/mapping"
) )
const contextTagKey = "ctx" const contextTagKey = "ctx"

View File

@@ -1,6 +1,6 @@
package discov package discov
import "github.com/zeromicro/go-zero/core/discov/internal" import "github.com/tal-tech/go-zero/core/discov/internal"
// RegisterAccount registers the username/password to the given etcd cluster. // RegisterAccount registers the username/password to the given etcd cluster.
func RegisterAccount(endpoints []string, user, pass string) { func RegisterAccount(endpoints []string, user, pass string) {

View File

@@ -4,8 +4,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal" "github.com/tal-tech/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
func TestRegisterAccount(t *testing.T) { func TestRegisterAccount(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/zeromicro/go-zero/core/discov/internal" "github.com/tal-tech/go-zero/core/discov/internal"
) )
const ( const (

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal" "github.com/tal-tech/go-zero/core/discov/internal"
) )
var mockLock sync.Mutex var mockLock sync.Mutex

View File

@@ -2,13 +2,6 @@ package discov
import "errors" import "errors"
var (
// errEmptyEtcdHosts indicates that etcd hosts are empty.
errEmptyEtcdHosts = errors.New("empty etcd hosts")
// errEmptyEtcdKey indicates that etcd key is empty.
errEmptyEtcdKey = errors.New("empty etcd key")
)
// EtcdConf is the config item with the given key on etcd. // EtcdConf is the config item with the given key on etcd.
type EtcdConf struct { type EtcdConf struct {
Hosts []string Hosts []string
@@ -34,9 +27,9 @@ func (c EtcdConf) HasTLS() bool {
// Validate validates c. // Validate validates c.
func (c EtcdConf) Validate() error { func (c EtcdConf) Validate() error {
if len(c.Hosts) == 0 { if len(c.Hosts) == 0 {
return errEmptyEtcdHosts return errors.New("empty etcd hosts")
} else if len(c.Key) == 0 { } else if len(c.Key) == 0 {
return errEmptyEtcdKey return errors.New("empty etcd key")
} else { } else {
return nil return nil
} }

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
func TestAccount(t *testing.T) { func TestAccount(t *testing.T) {

View File

@@ -9,11 +9,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/zeromicro/go-zero/core/contextx" "github.com/tal-tech/go-zero/core/contextx"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -7,10 +7,10 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/contextx" "github.com/tal-tech/go-zero/core/contextx"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -1,12 +1,12 @@
package discov package discov
import ( import (
"github.com/zeromicro/go-zero/core/discov/internal" "github.com/tal-tech/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/proc" "github.com/tal-tech/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -8,10 +8,10 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal" "github.com/tal-tech/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -4,9 +4,9 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/zeromicro/go-zero/core/discov/internal" "github.com/tal-tech/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
) )
type ( type (

View File

@@ -5,8 +5,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal" "github.com/tal-tech/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
const ( const (

View File

@@ -11,12 +11,10 @@ type (
errorArray []error errorArray []error
) )
// Add adds errs to be, nil errors are ignored. // Add adds err to be.
func (be *BatchError) Add(errs ...error) { func (be *BatchError) Add(err error) {
for _, err := range errs { if err != nil {
if err != nil { be.errs = append(be.errs, err)
be.errs = append(be.errs, err)
}
} }
} }

View File

@@ -4,7 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/zeromicro/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
) )
// A DelayExecutor delays a tasks on given delay interval. // A DelayExecutor delays a tasks on given delay interval.

View File

@@ -3,8 +3,8 @@ package executors
import ( import (
"time" "time"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
// A LessExecutor is an executor to limit execution once within given time interval. // A LessExecutor is an executor to limit execution once within given time interval.

View File

@@ -5,7 +5,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
func TestLessExecutor_DoOrDiscard(t *testing.T) { func TestLessExecutor_DoOrDiscard(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/proc" "github.com/tal-tech/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const idleRound = 10 const idleRound = 10

View File

@@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const threshold = 10 const threshold = 10

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
) )
const ( const (

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
) )
func TestSplitLineChunks(t *testing.T) { func TestSplitLineChunks(t *testing.T) {

View File

@@ -5,9 +5,6 @@ import (
"os" "os"
) )
// errExceedFileSize indicates that the file size is exceeded.
var errExceedFileSize = errors.New("exceed file size")
// A RangeReader is used to read a range of content from a file. // A RangeReader is used to read a range of content from a file.
type RangeReader struct { type RangeReader struct {
file *os.File file *os.File
@@ -32,7 +29,7 @@ func (rr *RangeReader) Read(p []byte) (n int, err error) {
} }
if rr.stop < rr.start || rr.start >= stat.Size() { if rr.stop < rr.start || rr.start >= stat.Size() {
return 0, errExceedFileSize return 0, errors.New("exceed file size")
} }
if rr.stop-rr.start < int64(len(p)) { if rr.stop-rr.start < int64(len(p)) {

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
) )
func TestRangeReader(t *testing.T) { func TestRangeReader(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"github.com/zeromicro/go-zero/core/hash" "github.com/tal-tech/go-zero/core/hash"
) )
// TempFileWithText creates the temporary file with the given content, // TempFileWithText creates the temporary file with the given content,

View File

@@ -1,6 +1,6 @@
package fx package fx
import "github.com/zeromicro/go-zero/core/threading" import "github.com/tal-tech/go-zero/core/threading"
// Parallel runs fns parallelly and waits for done. // Parallel runs fns parallelly and waits for done.
func Parallel(fns ...func()) { func Parallel(fns ...func()) {

View File

@@ -1,6 +1,6 @@
package fx package fx
import "github.com/zeromicro/go-zero/core/errorx" import "github.com/tal-tech/go-zero/core/errorx"
const defaultRetryTimes = 3 const defaultRetryTimes = 3

View File

@@ -4,9 +4,9 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/zeromicro/go-zero/core/collection" "github.com/tal-tech/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
) )
const ( const (

View File

@@ -13,8 +13,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
"go.uber.org/goleak"
) )
func TestBuffer(t *testing.T) { func TestBuffer(t *testing.T) {
@@ -564,6 +563,9 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
} }
func runCheckedTest(t *testing.T, fn func(t *testing.T)) { func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
defer goleak.VerifyNone(t) goroutines := runtime.NumGoroutine()
fn(t) fn(t)
// let scheduler schedule first
time.Sleep(time.Millisecond)
assert.True(t, runtime.NumGoroutine() <= goroutines)
} }

View File

@@ -6,8 +6,8 @@ import (
"strconv" "strconv"
"sync" "sync"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/mapping" "github.com/tal-tech/go-zero/core/mapping"
) )
const ( const (

View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/mathx" "github.com/tal-tech/go-zero/core/mathx"
) )
const ( const (

View File

@@ -9,8 +9,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
func TestReadText(t *testing.T) { func TestReadText(t *testing.T) {

View File

@@ -51,5 +51,5 @@ func unmarshalUseNumber(decoder *json.Decoder, v interface{}) error {
} }
func formatError(v string, err error) error { func formatError(v string, err error) error {
return fmt.Errorf("string: `%s`, error: `%w`", v, err) return fmt.Errorf("string: `%s`, error: `%s`", v, err.Error())
} }

View File

@@ -5,11 +5,12 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/zeromicro/go-zero/core/stores/redis" "github.com/tal-tech/go-zero/core/stores/redis"
) )
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key const (
const periodScript = `local limit = tonumber(ARGV[1]) // to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
periodScript = `local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2]) local window = tonumber(ARGV[2])
local current = redis.call("INCRBY", KEYS[1], 1) local current = redis.call("INCRBY", KEYS[1], 1)
if current == 1 then if current == 1 then
@@ -22,6 +23,8 @@ elseif current == limit then
else else
return 0 return 0
end` end`
zoneDiff = 3600 * 8 // GMT+8 for our services
)
const ( const (
// Unknown means not initialized state. // Unknown means not initialized state.
@@ -101,9 +104,7 @@ func (h *PeriodLimit) Take(key string) (int, error) {
func (h *PeriodLimit) calcExpireSeconds() int { func (h *PeriodLimit) calcExpireSeconds() int {
if h.align { if h.align {
now := time.Now() unix := time.Now().Unix() + zoneDiff
_, offset := now.Zone()
unix := now.Unix() + int64(offset)
return h.period - int(unix%int64(h.period)) return h.period - int(unix%int64(h.period))
} }
@@ -111,8 +112,6 @@ func (h *PeriodLimit) calcExpireSeconds() int {
} }
// Align returns a func to customize a PeriodLimit with alignment. // Align returns a func to customize a PeriodLimit with alignment.
// For example, if we want to limit end users with 5 sms verification messages every day,
// we need to align with the local timezone and the start of the day.
func Align() PeriodOption { func Align() PeriodOption {
return func(l *PeriodLimit) { return func(l *PeriodLimit) {
l.align = true l.align = true

View File

@@ -5,8 +5,8 @@ import (
"github.com/alicebob/miniredis/v2" "github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stores/redis" "github.com/tal-tech/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis/redistest" "github.com/tal-tech/go-zero/core/stores/redis/redistest"
) )
func TestPeriodLimit_Take(t *testing.T) { func TestPeriodLimit_Take(t *testing.T) {
@@ -23,9 +23,10 @@ func TestPeriodLimit_RedisUnavailable(t *testing.T) {
const ( const (
seconds = 1 seconds = 1
total = 100
quota = 5 quota = 5
) )
l := NewPeriodLimit(seconds, quota, redis.New(s.Addr()), "periodlimit") l := NewPeriodLimit(seconds, quota, redis.NewRedis(s.Addr(), redis.NodeType), "periodlimit")
s.Close() s.Close()
val, err := l.Take("first") val, err := l.Take("first")
assert.NotNil(t, err) assert.NotNil(t, err)

View File

@@ -7,8 +7,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis" "github.com/tal-tech/go-zero/core/stores/redis"
xrate "golang.org/x/time/rate" xrate "golang.org/x/time/rate"
) )
@@ -85,8 +85,8 @@ func (lim *TokenLimiter) Allow() bool {
} }
// AllowN reports whether n events may happen at time now. // AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate. // Use this method if you intend to drop / skip events that exceed the rate rate.
// Otherwise, use Reserve or Wait. // Otherwise use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool { func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n) return lim.reserveN(now, n)
} }
@@ -112,8 +112,7 @@ func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
// Lua boolean false -> r Nil bulk reply // Lua boolean false -> r Nil bulk reply
if err == redis.Nil { if err == redis.Nil {
return false return false
} } else if err != nil {
if err != nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err) logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
lim.startMonitor() lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n) return lim.rescueLimiter.AllowN(now, n)

View File

@@ -6,9 +6,9 @@ import (
"github.com/alicebob/miniredis/v2" "github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis" "github.com/tal-tech/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis/redistest" "github.com/tal-tech/go-zero/core/stores/redis/redistest"
) )
func init() { func init() {

View File

@@ -7,11 +7,11 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/collection" "github.com/tal-tech/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const ( const (

View File

@@ -8,11 +8,11 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/collection" "github.com/tal-tech/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx" "github.com/tal-tech/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
) )
const ( const (

View File

@@ -3,7 +3,7 @@ package load
import ( import (
"io" "io"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
) )
// A ShedderGroup is a manager to manage key based shedders. // A ShedderGroup is a manager to manage key based shedders.

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stat" "github.com/tal-tech/go-zero/core/stat"
) )
type ( type (

View File

@@ -3,11 +3,10 @@ package logx
// A LogConf is a logging config. // A LogConf is a logging config.
type LogConf struct { type LogConf struct {
ServiceName string `json:",optional"` ServiceName string `json:",optional"`
Mode string `json:",default=console,options=[console,file,volume]"` Mode string `json:",default=console,options=console|file|volume"`
Encoding string `json:",default=json,options=[json,plain]"`
TimeFormat string `json:",optional"` TimeFormat string `json:",optional"`
Path string `json:",default=logs"` Path string `json:",default=logs"`
Level string `json:",default=info,options=[info,error,severe]"` Level string `json:",default=info,options=info|error|severe"`
Compress bool `json:",optional"` Compress bool `json:",optional"`
KeepDays int `json:",optional"` KeepDays int `json:",optional"`
StackCooldownMillis int `json:",default=100"` StackCooldownMillis int `json:",default=100"`

View File

@@ -3,10 +3,9 @@ package logx
import ( import (
"fmt" "fmt"
"io" "io"
"sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const durationCallerDepth = 3 const durationCallerDepth = 3
@@ -80,15 +79,10 @@ func (l *durationLogger) WithDuration(duration time.Duration) Logger {
} }
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) { func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
switch atomic.LoadUint32(&encoding) { outputJson(writer, &durationLogger{
case plainEncodingType: Timestamp: getTimestamp(),
writePlainAny(writer, level, val, l.Duration) Level: level,
default: Content: val,
outputJson(writer, &durationLogger{ Duration: l.Duration,
Timestamp: getTimestamp(), })
Level: level,
Content: val,
Duration: l.Duration,
})
}
} }

View File

@@ -3,7 +3,6 @@ package logx
import ( import (
"log" "log"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -38,19 +37,6 @@ func TestWithDurationInfo(t *testing.T) {
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String()) assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
} }
func TestWithDurationInfoConsole(t *testing.T) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
var builder strings.Builder
log.SetOutput(&builder)
WithDuration(time.Second).Info("foo")
assert.True(t, strings.Contains(builder.String(), "ms"), builder.String())
}
func TestWithDurationInfof(t *testing.T) { func TestWithDurationInfof(t *testing.T) {
var builder strings.Builder var builder strings.Builder
log.SetOutput(&builder) log.SetOutput(&builder)

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
type limitedExecutor struct { type limitedExecutor struct {

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
func TestLimitedExecutor_logOrDiscard(t *testing.T) { func TestLimitedExecutor_logOrDiscard(t *testing.T) {

View File

@@ -1,7 +1,6 @@
package logx package logx
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -18,9 +17,9 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/iox" "github.com/tal-tech/go-zero/core/iox"
"github.com/zeromicro/go-zero/core/sysx" "github.com/tal-tech/go-zero/core/sysx"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const ( const (
@@ -32,15 +31,6 @@ const (
SevereLevel SevereLevel
) )
const (
jsonEncodingType = iota
plainEncodingType
jsonEncoding = "json"
plainEncoding = "plain"
plainEncodingSep = '\t'
)
const ( const (
accessFilename = "access.log" accessFilename = "access.log"
errorFilename = "error.log" errorFilename = "error.log"
@@ -72,10 +62,9 @@ var (
// ErrLogServiceNameNotSet is an error that indicates that the service name is not set. // ErrLogServiceNameNotSet is an error that indicates that the service name is not set.
ErrLogServiceNameNotSet = errors.New("log service name must be set") ErrLogServiceNameNotSet = errors.New("log service name must be set")
timeFormat = "2006-01-02T15:04:05.000Z07:00" timeFormat = "2006-01-02T15:04:05.000Z07"
writeConsole bool writeConsole bool
logLevel uint32 logLevel uint32
encoding uint32 = jsonEncodingType
// use uint32 for atomic operations // use uint32 for atomic operations
disableStat uint32 disableStat uint32
infoLog io.WriteCloser infoLog io.WriteCloser
@@ -135,12 +124,6 @@ func SetUp(c LogConf) error {
if len(c.TimeFormat) > 0 { if len(c.TimeFormat) > 0 {
timeFormat = c.TimeFormat timeFormat = c.TimeFormat
} }
switch c.Encoding {
case plainEncoding:
atomic.StoreUint32(&encoding, plainEncodingType)
default:
atomic.StoreUint32(&encoding, jsonEncodingType)
}
switch c.Mode { switch c.Mode {
case consoleMode: case consoleMode:
@@ -424,31 +407,21 @@ func infoTextSync(msg string) {
} }
func outputAny(writer io.Writer, level string, val interface{}) { func outputAny(writer io.Writer, level string, val interface{}) {
switch atomic.LoadUint32(&encoding) { info := logEntry{
case plainEncodingType: Timestamp: getTimestamp(),
writePlainAny(writer, level, val) Level: level,
default: Content: val,
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: val,
}
outputJson(writer, info)
} }
outputJson(writer, info)
} }
func outputText(writer io.Writer, level, msg string) { func outputText(writer io.Writer, level, msg string) {
switch atomic.LoadUint32(&encoding) { info := logEntry{
case plainEncodingType: Timestamp: getTimestamp(),
writePlainText(writer, level, msg) Level: level,
default: Content: msg,
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: msg,
}
outputJson(writer, info)
} }
outputJson(writer, info)
} }
func outputError(writer io.Writer, msg string, callDepth int) { func outputError(writer io.Writer, msg string, callDepth int) {
@@ -592,62 +565,6 @@ func statSync(msg string) {
} }
} }
func writePlainAny(writer io.Writer, level string, val interface{}, fields ...string) {
switch v := val.(type) {
case string:
writePlainText(writer, level, v, fields...)
case error:
writePlainText(writer, level, v.Error(), fields...)
case fmt.Stringer:
writePlainText(writer, level, v.String(), fields...)
default:
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
if err := json.NewEncoder(&buf).Encode(val); err != nil {
log.Println(err.Error())
return
}
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
}
func writePlainText(writer io.Writer, level, msg string, fields ...string) {
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
buf.WriteString(msg)
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
type logWriter struct { type logWriter struct {
logger *log.Logger logger *log.Logger
} }

View File

@@ -141,78 +141,6 @@ func TestStructedLogInfov(t *testing.T) {
}) })
} }
func TestStructedLogInfoConsoleAny(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(v)
})
}
func TestStructedLogInfoConsoleAnyString(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(fmt.Sprint(v...))
})
}
func TestStructedLogInfoConsoleAnyError(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(errors.New(fmt.Sprint(v...)))
})
}
func TestStructedLogInfoConsoleAnyStringer(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(ValStringer{
val: fmt.Sprint(v...),
})
})
}
func TestStructedLogInfoConsoleText(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Info(fmt.Sprint(v...))
})
}
func TestStructedLogSlow(t *testing.T) { func TestStructedLogSlow(t *testing.T) {
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) { doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
slowLog = writer slowLog = writer
@@ -504,17 +432,6 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
assert.True(t, strings.Contains(val, message)) assert.True(t, strings.Contains(val, message))
} }
func doTestStructedLogConsole(t *testing.T, setup func(writer io.WriteCloser),
write func(...interface{})) {
const message = "hello there"
writer := new(mockWriter)
setup(writer)
atomic.StoreUint32(&initialized, 1)
write(message)
println(writer.String())
assert.True(t, strings.Contains(writer.String(), message))
}
func testSetLevelTwiceWithMode(t *testing.T, mode string) { func testSetLevelTwiceWithMode(t *testing.T, mode string) {
SetUp(LogConf{ SetUp(LogConf{
Mode: mode, Mode: mode,
@@ -539,11 +456,3 @@ func testSetLevelTwiceWithMode(t *testing.T, mode string) {
ErrorStackf(message) ErrorStackf(message)
assert.Equal(t, 0, writer.builder.Len()) assert.Equal(t, 0, writer.builder.Len())
} }
type ValStringer struct {
val string
}
func (v ValStringer) String() string {
return v.val
}

View File

@@ -13,9 +13,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
) )
const ( const (

View File

@@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs" "github.com/tal-tech/go-zero/core/fs"
) )
func TestDailyRotateRuleMarkRotated(t *testing.T) { func TestDailyRotateRuleMarkRotated(t *testing.T) {

View File

@@ -29,9 +29,9 @@ func TestRedirector(t *testing.T) {
} }
func captureOutput(f func()) string { func captureOutput(f func()) string {
atomic.StoreUint32(&initialized, 1)
writer := new(mockWriter) writer := new(mockWriter)
infoLog = writer infoLog = writer
atomic.StoreUint32(&initialized, 1)
prevLevel := atomic.LoadUint32(&logLevel) prevLevel := atomic.LoadUint32(&logLevel)
SetLevel(InfoLevel) SetLevel(InfoLevel)
@@ -44,9 +44,5 @@ func captureOutput(f func()) string {
func getContent(jsonStr string) string { func getContent(jsonStr string) string {
var entry logEntry var entry logEntry
json.Unmarshal([]byte(jsonStr), &entry) json.Unmarshal([]byte(jsonStr), &entry)
val, ok := entry.Content.(string) return entry.Content.(string)
if ok {
return val
}
return ""
} }

View File

@@ -4,10 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sync/atomic"
"time" "time"
"github.com/zeromicro/go-zero/core/timex" "github.com/tal-tech/go-zero/core/timex"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@@ -78,24 +77,16 @@ func (l *traceLogger) WithDuration(duration time.Duration) Logger {
} }
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) { func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
traceID := traceIdFromContext(l.ctx) outputJson(writer, &traceLogger{
spanID := spanIdFromContext(l.ctx) logEntry: logEntry{
Timestamp: getTimestamp(),
switch atomic.LoadUint32(&encoding) { Level: level,
case plainEncodingType: Duration: l.Duration,
writePlainAny(writer, level, val, l.Duration, traceID, spanID) Content: val,
default: },
outputJson(writer, &traceLogger{ Trace: traceIdFromContext(l.ctx),
logEntry: logEntry{ Span: spanIdFromContext(l.ctx),
Timestamp: getTimestamp(), })
Level: level,
Duration: l.Duration,
Content: val,
},
Trace: traceID,
Span: spanID,
})
}
} }
// WithContext sets ctx to log, for keeping tracing information. // WithContext sets ctx to log, for keeping tracing information.

View File

@@ -82,37 +82,6 @@ func TestTraceInfo(t *testing.T) {
assert.True(t, strings.Contains(buf.String(), spanKey)) assert.True(t, strings.Contains(buf.String(), spanKey))
} }
func TestTraceInfoConsole(t *testing.T) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, jsonEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
infoLog = newLogWriter(log.New(&buf, "", flags))
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infov(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
}
func TestTraceSlow(t *testing.T) { func TestTraceSlow(t *testing.T) {
var buf mockWriter var buf mockWriter
atomic.StoreUint32(&initialized, 1) atomic.StoreUint32(&initialized, 1)

View File

@@ -3,7 +3,7 @@ package mapping
import ( import (
"io" "io"
"github.com/zeromicro/go-zero/core/jsonx" "github.com/tal-tech/go-zero/core/jsonx"
) )
const jsonTagKey = "json" const jsonTagKey = "json"

View File

@@ -9,9 +9,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/zeromicro/go-zero/core/jsonx" "github.com/tal-tech/go-zero/core/jsonx"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
const ( const (
@@ -742,9 +742,7 @@ func getValueWithChainedKeys(m Valuer, keys []string) (interface{}, bool) {
if len(keys) == 1 { if len(keys) == 1 {
v, ok := m.Value(keys[0]) v, ok := m.Value(keys[0])
return v, ok return v, ok
} } else if len(keys) > 1 {
if len(keys) > 1 {
if v, ok := m.Value(keys[0]); ok { if v, ok := m.Value(keys[0]); ok {
if nextm, ok := v.(map[string]interface{}); ok { if nextm, ok := v.(map[string]interface{}); ok {
return getValueWithChainedKeys(MapValuer(nextm), keys[1:]) return getValueWithChainedKeys(MapValuer(nextm), keys[1:])

View File

@@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
// because json.Number doesn't support strconv.ParseUint(...), // because json.Number doesn't support strconv.ParseUint(...),

View File

@@ -10,7 +10,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
const ( const (

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"io" "io"
"io/ioutil"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@@ -13,7 +14,7 @@ const yamlTagKey = "json"
var ( var (
// ErrUnsupportedType is an error that indicates the config format is not supported. // ErrUnsupportedType is an error that indicates the config format is not supported.
ErrUnsupportedType = errors.New("only map-like configs are supported") ErrUnsupportedType = errors.New("only map-like configs are suported")
yamlUnmarshaler = NewUnmarshaler(yamlTagKey) yamlUnmarshaler = NewUnmarshaler(yamlTagKey)
) )
@@ -28,6 +29,39 @@ func UnmarshalYamlReader(reader io.Reader, v interface{}) error {
return unmarshalYamlReader(reader, v, yamlUnmarshaler) return unmarshalYamlReader(reader, v, yamlUnmarshaler)
} }
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
var o interface{}
if err := yamlUnmarshal(content, &o); err != nil {
return err
}
if m, ok := o.(map[string]interface{}); ok {
return unmarshaler.Unmarshal(m, v)
}
return ErrUnsupportedType
}
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
content, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
return unmarshalYamlBytes(content, v, unmarshaler)
}
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
func yamlUnmarshal(in []byte, out interface{}) error {
var res interface{}
if err := yaml.Unmarshal(in, &res); err != nil {
return err
}
*out.(*interface{}) = cleanupMapValue(res)
return nil
}
func cleanupInterfaceMap(in map[interface{}]interface{}) map[string]interface{} { func cleanupInterfaceMap(in map[interface{}]interface{}) map[string]interface{} {
res := make(map[string]interface{}) res := make(map[string]interface{})
for k, v := range in { for k, v := range in {
@@ -62,40 +96,3 @@ func cleanupMapValue(v interface{}) interface{} {
return Repr(v) return Repr(v)
} }
} }
func unmarshal(unmarshaler *Unmarshaler, o interface{}, v interface{}) error {
if m, ok := o.(map[string]interface{}); ok {
return unmarshaler.Unmarshal(m, v)
}
return ErrUnsupportedType
}
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
var o interface{}
if err := yamlUnmarshal(content, &o); err != nil {
return err
}
return unmarshal(unmarshaler, o, v)
}
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
var res interface{}
if err := yaml.NewDecoder(reader).Decode(&res); err != nil {
return err
}
return unmarshal(unmarshaler, cleanupMapValue(res), v)
}
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
func yamlUnmarshal(in []byte, out interface{}) error {
var res interface{}
if err := yaml.Unmarshal(in, &res); err != nil {
return err
}
*out.(*interface{}) = cleanupMapValue(res)
return nil
}

View File

@@ -926,17 +926,14 @@ func TestUnmarshalYamlBytesError(t *testing.T) {
} }
func TestUnmarshalYamlReaderError(t *testing.T) { func TestUnmarshalYamlReaderError(t *testing.T) {
payload := `abcd: cdef`
reader := strings.NewReader(payload)
var v struct { var v struct {
Any string Any string
} }
reader := strings.NewReader(`abcd: cdef`)
err := UnmarshalYamlReader(reader, &v) err := UnmarshalYamlReader(reader, &v)
assert.NotNil(t, err) assert.NotNil(t, err)
reader = strings.NewReader("chenquan")
err = UnmarshalYamlReader(reader, &v)
assert.ErrorIs(t, err, ErrUnsupportedType)
} }
func TestUnmarshalYamlBadReader(t *testing.T) { func TestUnmarshalYamlBadReader(t *testing.T) {
@@ -1014,6 +1011,6 @@ func TestUnmarshalYamlMapRune(t *testing.T) {
type badReader struct{} type badReader struct{}
func (b *badReader) Read(_ []byte) (n int, err error) { func (b *badReader) Read(p []byte) (n int, err error) {
return 0, io.ErrLimitReached return 0, io.ErrLimitReached
} }

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx" "github.com/tal-tech/go-zero/core/stringx"
) )
func TestMaxInt(t *testing.T) { func TestMaxInt(t *testing.T) {

View File

@@ -2,7 +2,7 @@ package metric
import ( import (
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"github.com/zeromicro/go-zero/core/proc" "github.com/tal-tech/go-zero/core/proc"
) )
type ( type (

View File

@@ -2,7 +2,7 @@ package metric
import ( import (
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"github.com/zeromicro/go-zero/core/proc" "github.com/tal-tech/go-zero/core/proc"
) )
type ( type (

View File

@@ -2,7 +2,7 @@ package metric
import ( import (
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"github.com/zeromicro/go-zero/core/proc" "github.com/tal-tech/go-zero/core/proc"
) )
type ( type (

View File

@@ -3,11 +3,12 @@ package mr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"sync" "sync"
"sync/atomic"
"github.com/zeromicro/go-zero/core/errorx" "github.com/tal-tech/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/lang" "github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading"
) )
const ( const (
@@ -23,12 +24,12 @@ var (
) )
type ( type (
// ForEachFunc is used to do element processing, but no output.
ForEachFunc func(item interface{})
// GenerateFunc is used to let callers send elements into source. // GenerateFunc is used to let callers send elements into source.
GenerateFunc func(source chan<- interface{}) GenerateFunc func(source chan<- interface{})
// MapFunc is used to do element processing and write the output to writer. // MapFunc is used to do element processing and write the output to writer.
MapFunc func(item interface{}, writer Writer) MapFunc func(item interface{}, writer Writer)
// VoidMapFunc is used to do element processing, but no output.
VoidMapFunc func(item interface{})
// MapperFunc is used to do element processing and write the output to writer, // MapperFunc is used to do element processing and write the output to writer,
// use cancel func to cancel the processing. // use cancel func to cancel the processing.
MapperFunc func(item interface{}, writer Writer, cancel func(error)) MapperFunc func(item interface{}, writer Writer, cancel func(error))
@@ -41,16 +42,6 @@ type (
// Option defines the method to customize the mapreduce. // Option defines the method to customize the mapreduce.
Option func(opts *mapReduceOptions) Option func(opts *mapReduceOptions)
mapperContext struct {
ctx context.Context
mapper MapFunc
source <-chan interface{}
panicChan *onceChan
collector chan<- interface{}
doneChan <-chan lang.PlaceholderType
workers int
}
mapReduceOptions struct { mapReduceOptions struct {
ctx context.Context ctx context.Context
workers int workers int
@@ -78,6 +69,7 @@ func Finish(fns ...func() error) error {
cancel(err) cancel(err)
} }
}, func(pipe <-chan interface{}, cancel func(error)) { }, func(pipe <-chan interface{}, cancel func(error)) {
drain(pipe)
}, WithWorkers(len(fns))) }, WithWorkers(len(fns)))
} }
@@ -87,7 +79,7 @@ func FinishVoid(fns ...func()) {
return return
} }
ForEach(func(source chan<- interface{}) { MapVoid(func(source chan<- interface{}) {
for _, fn := range fns { for _, fn := range fns {
source <- fn source <- fn
} }
@@ -97,74 +89,41 @@ func FinishVoid(fns ...func()) {
}, WithWorkers(len(fns))) }, WithWorkers(len(fns)))
} }
// ForEach maps all elements from given generate but no output. // Map maps all elements generated from given generate func, and returns an output channel.
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) { func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
options := buildOptions(opts...) options := buildOptions(opts...)
panicChan := &onceChan{channel: make(chan interface{})} source := buildSource(generate)
source := buildSource(generate, panicChan)
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
done := make(chan lang.PlaceholderType) done := make(chan lang.PlaceholderType)
go executeMappers(mapperContext{ go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
ctx: options.ctx,
mapper: func(item interface{}, writer Writer) {
mapper(item)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
for { return collector
select {
case v := <-panicChan.channel:
panic(v)
case _, ok := <-collector:
if !ok {
return
}
}
}
} }
// MapReduce maps all elements generated from given generate func, // MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer. // and reduces the output elements with given reducer.
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) { opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})} source := buildSource(generate)
source := buildSource(generate, panicChan) return MapReduceWithSource(source, mapper, reducer, opts...)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
} }
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer. // MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) { opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
reducer ReducerFunc, opts ...Option) (interface{}, error) {
options := buildOptions(opts...) options := buildOptions(opts...)
// output is used to write the final result
output := make(chan interface{}) output := make(chan interface{})
defer func() { defer func() {
// reducer can only write once, if more, panic
for range output { for range output {
panic("more than one element written in reducer") panic("more than one element written in reducer")
} }
}() }()
// collector is used to collect data from mapper, and consume in reducer
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
// if done is closed, all mappers and reducer should stop processing
done := make(chan lang.PlaceholderType) done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done) writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once var closeOnce sync.Once
// use atomic.Value to avoid data race
var retErr errorx.AtomicError var retErr errorx.AtomicError
finish := func() { finish := func() {
closeOnce.Do(func() { closeOnce.Do(func() {
@@ -186,41 +145,28 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapp
go func() { go func() {
defer func() { defer func() {
drain(collector) drain(collector)
if r := recover(); r != nil { if r := recover(); r != nil {
panicChan.write(r) cancel(fmt.Errorf("%v", r))
} else {
finish()
} }
finish()
}() }()
reducer(collector, writer, cancel) reducer(collector, writer, cancel)
}() }()
go executeMappers(mapperContext{ go executeMappers(options.ctx, func(item interface{}, w Writer) {
ctx: options.ctx, mapper(item, w, cancel)
mapper: func(item interface{}, w Writer) { }, source, collector, done, options.workers)
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
select { value, ok := <-output
case <-options.ctx.Done(): if err := retErr.Load(); err != nil {
cancel(context.DeadlineExceeded) return nil, err
return nil, context.DeadlineExceeded } else if ok {
case v := <-panicChan.channel: return value, nil
panic(v) } else {
case v, ok := <-output: return nil, ErrReduceNoOutput
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return v, nil
} else {
return nil, ErrReduceNoOutput
}
} }
} }
@@ -229,14 +175,20 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapp
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { _, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel) reducer(input, cancel)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...) }, opts...)
if errors.Is(err, ErrReduceNoOutput) {
return nil
}
return err return err
} }
// MapVoid maps all elements from given generate but no output.
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
drain(Map(generate, func(item interface{}, writer Writer) {
mapper(item)
}, opts...))
}
// WithContext customizes a mapreduce processing accepts a given ctx. // WithContext customizes a mapreduce processing accepts a given ctx.
func WithContext(ctx context.Context) Option { func WithContext(ctx context.Context) Option {
return func(opts *mapReduceOptions) { return func(opts *mapReduceOptions) {
@@ -264,18 +216,12 @@ func buildOptions(opts ...Option) *mapReduceOptions {
return options return options
} }
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} { func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{}) source := make(chan interface{})
go func() { threading.GoSafe(func() {
defer func() { defer close(source)
if r := recover(); r != nil {
panicChan.write(r)
}
close(source)
}()
generate(source) generate(source)
}() })
return source return source
} }
@@ -287,43 +233,39 @@ func drain(channel <-chan interface{}) {
} }
} }
func executeMappers(mCtx mapperContext) { func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer func() { defer func() {
wg.Wait() wg.Wait()
close(mCtx.collector) close(collector)
drain(mCtx.source)
}() }()
var failed int32 pool := make(chan lang.PlaceholderType, workers)
pool := make(chan lang.PlaceholderType, mCtx.workers) writer := newGuardedWriter(ctx, collector, done)
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan) for {
for atomic.LoadInt32(&failed) == 0 {
select { select {
case <-mCtx.ctx.Done(): case <-ctx.Done():
return return
case <-mCtx.doneChan: case <-done:
return return
case pool <- lang.Placeholder: case pool <- lang.Placeholder:
item, ok := <-mCtx.source item, ok := <-input
if !ok { if !ok {
<-pool <-pool
return return
} }
wg.Add(1) wg.Add(1)
go func() { // better to safely run caller defined method
threading.GoSafe(func() {
defer func() { defer func() {
if r := recover(); r != nil {
atomic.AddInt32(&failed, 1)
mCtx.panicChan.write(r)
}
wg.Done() wg.Done()
<-pool <-pool
}() }()
mCtx.mapper(item, writer) mapper(item, writer)
}() })
} }
} }
} }
@@ -369,16 +311,3 @@ func (gw guardedWriter) Write(v interface{}) {
gw.channel <- v gw.channel <- v
} }
} }
type onceChan struct {
channel chan interface{}
wrote int32
}
func (oc *onceChan) write(val interface{}) {
if atomic.AddInt32(&oc.wrote, 1) > 1 {
return
}
oc.channel <- val
}

View File

@@ -1,78 +0,0 @@
//go:build go1.18
// +build go1.18
package mr
import (
"fmt"
"math/rand"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func FuzzMapReduce(f *testing.F) {
rand.Seed(time.Now().UnixNano())
f.Add(uint(10), uint(runtime.NumCPU()))
f.Fuzz(func(t *testing.T, num uint, workers uint) {
n := int64(num)%5000 + 5000
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
})
}

View File

@@ -1,107 +0,0 @@
//go:build fuzz
// +build fuzz
package mr
import (
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
// so we need to simulate the fuzz test in test mode.
func TestMapReduceRandom(t *testing.T) {
rand.Seed(time.Now().UnixNano())
const (
times = 10000
nRange = 500
mega = 1024 * 1024
)
bar := pb.New(times).Start()
runner := threading.NewTaskRunner(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
runner.Schedule(func() {
start := time.Now()
defer func() {
if time.Since(start) > time.Minute {
t.Fatal("timeout")
}
wg.Done()
}()
t.Run(strconv.Itoa(i), func(t *testing.T) {
n := rand.Int63n(nRange)%nRange + nRange
workers := rand.Int()%50 + runtime.NumCPU()/2
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
bar.Increment()
})
})
}
wg.Wait()
bar.Finish()
}

View File

@@ -11,7 +11,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.uber.org/goleak" "github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx"
) )
var errDummy = errors.New("dummy") var errDummy = errors.New("dummy")
@@ -21,8 +22,6 @@ func init() {
} }
func TestFinish(t *testing.T) { func TestFinish(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32 var total uint32
err := Finish(func() error { err := Finish(func() error {
atomic.AddUint32(&total, 2) atomic.AddUint32(&total, 2)
@@ -40,20 +39,14 @@ func TestFinish(t *testing.T) {
} }
func TestFinishNone(t *testing.T) { func TestFinishNone(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Nil(t, Finish()) assert.Nil(t, Finish())
} }
func TestFinishVoidNone(t *testing.T) { func TestFinishVoidNone(t *testing.T) {
defer goleak.VerifyNone(t)
FinishVoid() FinishVoid()
} }
func TestFinishErr(t *testing.T) { func TestFinishErr(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32 var total uint32
err := Finish(func() error { err := Finish(func() error {
atomic.AddUint32(&total, 2) atomic.AddUint32(&total, 2)
@@ -70,8 +63,6 @@ func TestFinishErr(t *testing.T) {
} }
func TestFinishVoid(t *testing.T) { func TestFinishVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32 var total uint32
FinishVoid(func() { FinishVoid(func() {
atomic.AddUint32(&total, 2) atomic.AddUint32(&total, 2)
@@ -84,107 +75,70 @@ func TestFinishVoid(t *testing.T) {
assert.Equal(t, uint32(10), atomic.LoadUint32(&total)) assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
} }
func TestForEach(t *testing.T) { func TestMap(t *testing.T) {
const tasks = 1000 tests := []struct {
mapper MapFunc
expect int
}{
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
writer.Write(v * v)
},
expect: 30,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
return
}
writer.Write(v * v)
},
expect: 10,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
panic(v)
}
writer.Write(v * v)
},
expect: 10,
},
}
t.Run("all", func(t *testing.T) { for _, test := range tests {
defer goleak.VerifyNone(t) t.Run(stringx.Rand(), func(t *testing.T) {
channel := Map(func(source chan<- interface{}) {
var count uint32 for i := 1; i < 5; i++ {
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
}, WithWorkers(-1))
assert.Equal(t, tasks, int(count))
})
t.Run("odd", func(t *testing.T) {
defer goleak.VerifyNone(t)
var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
if item.(int)%2 == 0 {
atomic.AddUint32(&count, 1)
}
})
assert.Equal(t, tasks/2, int(count))
})
t.Run("all", func(t *testing.T) {
defer goleak.VerifyNone(t)
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i source <- i
} }
}, func(item interface{}) { }, test.mapper, WithWorkers(-1))
panic("foo")
}) var result int
for v := range channel {
result += v.(int)
}
assert.Equal(t, test.expect, result)
}) })
}) }
}
func TestGeneratePanic(t *testing.T) {
defer goleak.VerifyNone(t)
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
panic("foo")
}, func(item interface{}) {
})
})
})
}
func TestMapperPanic(t *testing.T) {
defer goleak.VerifyNone(t)
const tasks = 1000
var run int32
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
atomic.AddInt32(&run, 1)
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
})
})
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
})
} }
func TestMapReduce(t *testing.T) { func TestMapReduce(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct { tests := []struct {
name string
mapper MapperFunc mapper MapperFunc
reducer ReducerFunc reducer ReducerFunc
expectErr error expectErr error
expectValue interface{} expectValue interface{}
}{ }{
{ {
name: "simple",
expectErr: nil, expectErr: nil,
expectValue: 30, expectValue: 30,
}, },
{ {
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -195,7 +149,6 @@ func TestMapReduce(t *testing.T) {
expectErr: errDummy, expectErr: errDummy,
}, },
{ {
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -207,7 +160,6 @@ func TestMapReduce(t *testing.T) {
expectValue: nil, expectValue: nil,
}, },
{ {
name: "cancel with more",
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) { reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int var result int
for item := range pipe { for item := range pipe {
@@ -222,74 +174,36 @@ func TestMapReduce(t *testing.T) {
}, },
} }
t.Run("MapReduce", func(t *testing.T) { for _, test := range tests {
for _, test := range tests { t.Run(stringx.Rand(), func(t *testing.T) {
t.Run(test.name, func(t *testing.T) { if test.mapper == nil {
if test.mapper == nil { test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) { v := item.(int)
v := item.(int) writer.Write(v * v)
writer.Write(v * v)
}
} }
if test.reducer == nil { }
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) { if test.reducer == nil {
var result int test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for item := range pipe { var result int
result += item.(int) for item := range pipe {
} result += item.(int)
writer.Write(result)
} }
writer.Write(result)
} }
value, err := MapReduce(func(source chan<- interface{}) { }
for i := 1; i < 5; i++ { value, err := MapReduce(func(source chan<- interface{}) {
source <- i for i := 1; i < 5; i++ {
} source <- i
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
})
t.Run("MapReduce", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
} }
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
source := make(chan interface{}) assert.Equal(t, test.expectErr, err)
go func() { assert.Equal(t, test.expectValue, value)
for i := 1; i < 5; i++ { })
source <- i }
}
close(source)
}()
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
})
} }
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) { func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() { assert.Panics(t, func() {
MapReduce(func(source chan<- interface{}) { MapReduce(func(source chan<- interface{}) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@@ -306,23 +220,18 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
} }
func TestMapReduceVoid(t *testing.T) { func TestMapReduceVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var value uint32 var value uint32
tests := []struct { tests := []struct {
name string
mapper MapperFunc mapper MapperFunc
reducer VoidReducerFunc reducer VoidReducerFunc
expectValue uint32 expectValue uint32
expectErr error expectErr error
}{ }{
{ {
name: "simple",
expectValue: 30, expectValue: 30,
expectErr: nil, expectErr: nil,
}, },
{ {
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -333,7 +242,6 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: errDummy, expectErr: errDummy,
}, },
{ {
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -344,7 +252,6 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: ErrCancelWithNil, expectErr: ErrCancelWithNil,
}, },
{ {
name: "cancel with more",
reducer: func(pipe <-chan interface{}, cancel func(error)) { reducer: func(pipe <-chan interface{}, cancel func(error)) {
for item := range pipe { for item := range pipe {
result := atomic.AddUint32(&value, uint32(item.(int))) result := atomic.AddUint32(&value, uint32(item.(int)))
@@ -358,7 +265,7 @@ func TestMapReduceVoid(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(stringx.Rand(), func(t *testing.T) {
atomic.StoreUint32(&value, 0) atomic.StoreUint32(&value, 0)
if test.mapper == nil { if test.mapper == nil {
@@ -389,8 +296,6 @@ func TestMapReduceVoid(t *testing.T) {
} }
func TestMapReduceVoidWithDelay(t *testing.T) { func TestMapReduceVoidWithDelay(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int var result []int
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0 source <- 0
@@ -413,64 +318,38 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
assert.Equal(t, 0, result[1]) assert.Equal(t, 0, result[1])
} }
func TestMapVoid(t *testing.T) {
const tasks = 1000
var count uint32
MapVoid(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
})
assert.Equal(t, tasks, int(count))
}
func TestMapReducePanic(t *testing.T) { func TestMapReducePanic(t *testing.T) {
defer goleak.VerifyNone(t) v, err := MapReduce(func(source chan<- interface{}) {
source <- 0
assert.Panics(t, func() { source <- 1
_, _ = MapReduce(func(source chan<- interface{}) { }, func(item interface{}, writer Writer, cancel func(error)) {
source <- 0 i := item.(int)
source <- 1 writer.Write(i)
}, func(item interface{}, writer Writer, cancel func(error)) { }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
i := item.(int) for range pipe {
writer.Write(i) panic("panic")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) { }
for range pipe {
panic("panic")
}
})
})
}
func TestMapReducePanicOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < 100; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
if i == 0 {
panic("foo")
}
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("bar")
}
})
})
}
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
panic("bar")
})
}) })
assert.Nil(t, v)
assert.NotNil(t, err)
assert.Equal(t, "panic", err.Error())
} }
func TestMapReduceVoidCancel(t *testing.T) { func TestMapReduceVoidCancel(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int var result []int
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0 source <- 0
@@ -492,15 +371,13 @@ func TestMapReduceVoidCancel(t *testing.T) {
} }
func TestMapReduceVoidCancelWithRemains(t *testing.T) { func TestMapReduceVoidCancelWithRemains(t *testing.T) {
defer goleak.VerifyNone(t) var done syncx.AtomicBool
var done int32
var result []int var result []int
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
atomic.AddInt32(&done, 1) done.Set(true)
}, func(item interface{}, writer Writer, cancel func(error)) { }, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int) i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
@@ -515,12 +392,10 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
}) })
assert.NotNil(t, err) assert.NotNil(t, err)
assert.Equal(t, "anything", err.Error()) assert.Equal(t, "anything", err.Error())
assert.Equal(t, int32(1), done) assert.True(t, done.True())
} }
func TestMapReduceWithoutReducerWrite(t *testing.T) { func TestMapReduceWithoutReducerWrite(t *testing.T) {
defer goleak.VerifyNone(t)
uids := []int{1, 2, 3} uids := []int{1, 2, 3}
res, err := MapReduce(func(source chan<- interface{}) { res, err := MapReduce(func(source chan<- interface{}) {
for _, uid := range uids { for _, uid := range uids {
@@ -537,54 +412,33 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
} }
func TestMapReduceVoidPanicInReducer(t *testing.T) { func TestMapReduceVoidPanicInReducer(t *testing.T) {
defer goleak.VerifyNone(t)
const message = "foo" const message = "foo"
assert.Panics(t, func() { var done syncx.AtomicBool
var done int32 err := MapReduceVoid(func(source chan<- interface{}) {
_ = MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, cancel func(error)) {
panic(message)
}, WithWorkers(1))
})
}
func TestForEachWithContext(t *testing.T) {
defer goleak.VerifyNone(t)
var done int32
ctx, cancel := context.WithCancel(context.Background())
ForEach(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
atomic.AddInt32(&done, 1) done.Set(true)
}, func(item interface{}) { }, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int) i := item.(int)
if i == defaultWorkers/2 { writer.Write(i)
cancel() }, func(pipe <-chan interface{}, cancel func(error)) {
} panic(message)
}, WithContext(ctx)) }, WithWorkers(1))
assert.NotNil(t, err)
assert.Equal(t, message, err.Error())
assert.True(t, done.True())
} }
func TestMapReduceWithContext(t *testing.T) { func TestMapReduceWithContext(t *testing.T) {
defer goleak.VerifyNone(t) var done syncx.AtomicBool
var done int32
var result []int var result []int
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
atomic.AddInt32(&done, 1) done.Set(true)
}, func(item interface{}, writer Writer, c func(error)) { }, func(item interface{}, writer Writer, c func(error)) {
i := item.(int) i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
@@ -598,7 +452,7 @@ func TestMapReduceWithContext(t *testing.T) {
} }
}, WithContext(ctx)) }, WithContext(ctx))
assert.NotNil(t, err) assert.NotNil(t, err)
assert.Equal(t, context.DeadlineExceeded, err) assert.Equal(t, ErrReduceNoOutput, err)
} }
func BenchmarkMapReduce(b *testing.B) { func BenchmarkMapReduce(b *testing.B) {

View File

@@ -54,7 +54,7 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/zeromicro/go-zero/core/mr" "github.com/tal-tech/go-zero/core/mr"
) )
func main() { func main() {

View File

@@ -55,7 +55,7 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/zeromicro/go-zero/core/mr" "github.com/tal-tech/go-zero/core/mr"
) )
func main() { func main() {
@@ -87,4 +87,4 @@ More examples: [https://github.com/zeromicro/zero-examples/tree/main/mapreduce](
## Give a Star! ⭐ ## Give a Star! ⭐
If you like or are using this project to learn or start your solution, please give it a star. Thanks! If you like or are using this project to learn or start your solution, please give it a star. Thanks!

View File

@@ -11,7 +11,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
) )
const ( const (

View File

@@ -15,7 +15,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
) )
// DefaultMemProfileRate is the default memory profiling rate. // DefaultMemProfileRate is the default memory profiling rate.

View File

@@ -10,8 +10,8 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/zeromicro/go-zero/core/logx" "github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/threading" "github.com/tal-tech/go-zero/core/threading"
) )
const ( const (

View File

@@ -1,6 +1,3 @@
//go:build linux || darwin
// +build linux darwin
package proc package proc
import ( import (

Some files were not shown because too many files have changed in this diff Show More