Compare commits

..

1 Commits

Author SHA1 Message Date
spectatorMrZ
304fb182bb Fix pg model generation without tag (#1407)
1. fix pg model struct haven't tag
2. add pg command test from datasource
2022-01-08 21:48:09 +08:00
498 changed files with 2548 additions and 7505 deletions

View File

@@ -7,50 +7,32 @@ on:
branches: [ master ]
jobs:
test-linux:
name: Linux
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.15
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.14
id: go
- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Lint
run: |
go vet -stdmethods=false $(go list ./...)
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Test
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
- name: Lint
run: |
go vet -stdmethods=false $(go list ./...)
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
- name: Codecov
uses: codecov/codecov-action@v2
- name: Test
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
test-win:
name: Windows
runs-on: windows-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.15
- name: Checkout codebase
uses: actions/checkout@v2
- name: Test
run: |
go mod verify
go mod download
go test -v -race ./...
cd tools/goctl && go build -v goctl.go
- name: Codecov
uses: codecov/codecov-action@v2

View File

@@ -1,18 +0,0 @@
name: 'issue-translator'
on:
issue_comment:
types: [created]
issues:
types: [opened]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: tomsun28/issues-translate-action@v2.6
with:
IS_MODIFY_TITLE: true
# not require, default false, . Decide whether to modify the issue title
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑‍🤝‍🧑👫🧑🏿‍🤝‍🧑🏻👩🏾‍🤝‍👨🏿👬🏿
# not require. Customize the translation robot prefix message.

3
.gitignore vendored
View File

@@ -16,8 +16,7 @@
**/logs
# for test purpose
**/adhoc
**/testdata
adhoc
# gitlab ci
.cache

View File

@@ -40,7 +40,7 @@ We will help you to contribute in different areas like filing issues, developing
getting your work reviewed and merged.
If you have questions about the development process,
feel free to [file an issue](https://github.com/zeromicro/go-zero/issues/new/choose).
feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose).
## Find something to work on
@@ -50,10 +50,10 @@ Here is how you get started.
### Find a good first topic
[go-zero](https://github.com/zeromicro/go-zero) has beginner-friendly issues that provide a good first issue.
For example, [go-zero](https://github.com/zeromicro/go-zero) has
[help wanted](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
[good first issue](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
[go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue.
For example, [go-zero](https://github.com/tal-tech/go-zero) has
[help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
[good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
labels for issues that should not need deep knowledge of the system.
We can help new contributors who wish to work on such issues.
@@ -79,7 +79,7 @@ This is a rough outline of what a contributor's workflow looks like:
- Create a topic branch from where to base the contribution. This is usually master.
- Make commits of logical units.
- Push changes in a topic branch to a personal fork of the repository.
- Submit a pull request to [go-zero](https://github.com/zeromicro/go-zero).
- Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero).
## Creating Pull Requests

View File

@@ -20,9 +20,9 @@ We hope that the items listed below will inspire further engagement from the com
- [x] Support `goctl bug` to report bugs conveniently
## 2022
- [x] Support `context` in redis related methods for timeout and tracing
- [x] Support `context` in sql related methods for timeout and tracing
- [ ] Support `context` in mongodb related methods for timeout and tracing
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file
- [ ] Add `httpx.Client` with governance, like circuit breaker etc.
- [ ] Support `goctl doctor` command to report potential issues for given service
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file
- [ ] Support `context` in redis related methods for timeout and tracing
- [ ] Support `context` in sql related methods for timeout and tracing
- [ ] Support `context` in mongodb related methods for timeout and tracing

View File

@@ -4,8 +4,8 @@ import (
"errors"
"strconv"
"github.com/zeromicro/go-zero/core/hash"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/stores/redis"
)
const (

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
)
func TestRedisBitSet_New_Set_Test(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"strings"
"sync"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/timex"
)
const (

View File

@@ -8,7 +8,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stat"
)
func init() {

View File

@@ -6,7 +6,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stat"
)
func init() {

View File

@@ -4,8 +4,8 @@ import (
"math"
"time"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/mathx"
)
const (

View File

@@ -7,9 +7,9 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat"
)
const (

View File

@@ -8,8 +8,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/iox"
"github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
)
func TestEnterToContinue(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"encoding/base64"
"errors"
"github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/logx"
)
// ErrPaddingSize indicates bad padding size.

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/fs"
)
const (

View File

@@ -6,9 +6,9 @@ import (
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/syncx"
)
const (

View File

@@ -61,41 +61,3 @@ func TestPutMore(t *testing.T) {
assert.Equal(t, string(element), string(body.([]byte)))
}
}
func TestPutMoreWithHeaderNotZero(t *testing.T) {
elements := [][]byte{
[]byte("hello"),
[]byte("world"),
[]byte("again"),
}
queue := NewQueue(4)
for i := range elements {
queue.Put(elements[i])
}
// take 1
body, ok := queue.Take()
assert.True(t, ok)
element, ok := body.([]byte)
assert.True(t, ok)
assert.Equal(t, element, []byte("hello"))
// put more
queue.Put([]byte("b4"))
queue.Put([]byte("b5")) // will store in elements[0]
queue.Put([]byte("b6")) // cause expansion
results := [][]byte{
[]byte("world"),
[]byte("again"),
[]byte("b4"),
[]byte("b5"),
[]byte("b6"),
}
for _, element := range results {
body, ok := queue.Take()
assert.True(t, ok)
assert.Equal(t, string(element), string(body.([]byte)))
}
}

View File

@@ -4,7 +4,7 @@ import (
"sync"
"time"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/timex"
)
type (

View File

@@ -6,7 +6,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/stringx"
)
const duration = time.Millisecond * 50

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/stringx"
)
func TestSafeMap(t *testing.T) {

View File

@@ -1,8 +1,8 @@
package collection
import (
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
const (

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/logx"
)
func init() {

View File

@@ -5,9 +5,9 @@ import (
"fmt"
"time"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex"
)
const drainWorkers = 8

View File

@@ -8,10 +8,10 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
)
const (

View File

@@ -7,7 +7,7 @@ import (
"os"
"path"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/tal-tech/go-zero/core/mapping"
)
var loaders = map[string]func([]byte, interface{}) error{

View File

@@ -6,8 +6,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/hash"
)
func TestLoadConfig_notExists(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"strings"
"sync"
"github.com/zeromicro/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/iox"
)
// PropertyError represents a configuration error message.

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/fs"
)
func TestProperties(t *testing.T) {

View File

@@ -3,7 +3,7 @@ package contextx
import (
"context"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/tal-tech/go-zero/core/mapping"
)
const contextTagKey = "ctx"

View File

@@ -1,6 +1,6 @@
package discov
import "github.com/zeromicro/go-zero/core/discov/internal"
import "github.com/tal-tech/go-zero/core/discov/internal"
// RegisterAccount registers the username/password to the given etcd cluster.
func RegisterAccount(endpoints []string, user, pass string) {

View File

@@ -4,8 +4,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/stringx"
)
func TestRegisterAccount(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"strings"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/discov/internal"
)
const (

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/discov/internal"
)
var mockLock sync.Mutex

View File

@@ -2,13 +2,6 @@ package discov
import "errors"
var (
// errEmptyEtcdHosts indicates that etcd hosts are empty.
errEmptyEtcdHosts = errors.New("empty etcd hosts")
// errEmptyEtcdKey indicates that etcd key is empty.
errEmptyEtcdKey = errors.New("empty etcd key")
)
// EtcdConf is the config item with the given key on etcd.
type EtcdConf struct {
Hosts []string
@@ -34,9 +27,9 @@ func (c EtcdConf) HasTLS() bool {
// Validate validates c.
func (c EtcdConf) Validate() error {
if len(c.Hosts) == 0 {
return errEmptyEtcdHosts
return errors.New("empty etcd hosts")
} else if len(c.Key) == 0 {
return errEmptyEtcdKey
return errors.New("empty etcd key")
} else {
return nil
}

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/stringx"
)
func TestAccount(t *testing.T) {

View File

@@ -9,11 +9,11 @@ import (
"sync"
"time"
"github.com/zeromicro/go-zero/core/contextx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3"
)

View File

@@ -7,10 +7,10 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/contextx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

View File

@@ -1,12 +1,12 @@
package discov
import (
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3"
)

View File

@@ -8,10 +8,10 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx"
clientv3 "go.etcd.io/etcd/client/v3"
)

View File

@@ -4,9 +4,9 @@ import (
"sync"
"sync/atomic"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
)
type (

View File

@@ -5,8 +5,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/stringx"
)
const (

View File

@@ -11,12 +11,10 @@ type (
errorArray []error
)
// Add adds errs to be, nil errors are ignored.
func (be *BatchError) Add(errs ...error) {
for _, err := range errs {
if err != nil {
be.errs = append(be.errs, err)
}
// Add adds err to be.
func (be *BatchError) Add(err error) {
if err != nil {
be.errs = append(be.errs, err)
}
}

View File

@@ -4,7 +4,7 @@ import (
"sync"
"time"
"github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/threading"
)
// A DelayExecutor delays a tasks on given delay interval.

View File

@@ -3,8 +3,8 @@ package executors
import (
"time"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
)
// A LessExecutor is an executor to limit execution once within given time interval.

View File

@@ -5,7 +5,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/timex"
)
func TestLessExecutor_DoOrDiscard(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex"
)
const idleRound = 10

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/timex"
)
const threshold = 10

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/fs"
)
const (

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/fs"
)
func TestSplitLineChunks(t *testing.T) {

View File

@@ -5,9 +5,6 @@ import (
"os"
)
// errExceedFileSize indicates that the file size is exceeded.
var errExceedFileSize = errors.New("exceed file size")
// A RangeReader is used to read a range of content from a file.
type RangeReader struct {
file *os.File
@@ -32,7 +29,7 @@ func (rr *RangeReader) Read(p []byte) (n int, err error) {
}
if rr.stop < rr.start || rr.start >= stat.Size() {
return 0, errExceedFileSize
return 0, errors.New("exceed file size")
}
if rr.stop-rr.start < int64(len(p)) {

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/fs"
)
func TestRangeReader(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"io/ioutil"
"os"
"github.com/zeromicro/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/hash"
)
// TempFileWithText creates the temporary file with the given content,

View File

@@ -1,6 +1,6 @@
package fx
import "github.com/zeromicro/go-zero/core/threading"
import "github.com/tal-tech/go-zero/core/threading"
// Parallel runs fns parallelly and waits for done.
func Parallel(fns ...func()) {

View File

@@ -1,6 +1,6 @@
package fx
import "github.com/zeromicro/go-zero/core/errorx"
import "github.com/tal-tech/go-zero/core/errorx"
const defaultRetryTimes = 3

View File

@@ -4,9 +4,9 @@ import (
"sort"
"sync"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading"
)
const (

View File

@@ -13,8 +13,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx"
"go.uber.org/goleak"
"github.com/tal-tech/go-zero/core/stringx"
)
func TestBuffer(t *testing.T) {
@@ -564,6 +563,9 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
}
func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
defer goleak.VerifyNone(t)
goroutines := runtime.NumGoroutine()
fn(t)
// let scheduler schedule first
time.Sleep(time.Millisecond)
assert.True(t, runtime.NumGoroutine() <= goroutines)
}

View File

@@ -6,8 +6,8 @@ import (
"strconv"
"sync"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/mapping"
)
const (

View File

@@ -6,7 +6,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/mathx"
)
const (

View File

@@ -9,8 +9,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/stringx"
)
func TestReadText(t *testing.T) {

View File

@@ -51,5 +51,5 @@ func unmarshalUseNumber(decoder *json.Decoder, v interface{}) error {
}
func formatError(v string, err error) error {
return fmt.Errorf("string: `%s`, error: `%w`", v, err)
return fmt.Errorf("string: `%s`, error: `%s`", v, err.Error())
}

View File

@@ -5,11 +5,12 @@ import (
"strconv"
"time"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/stores/redis"
)
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
const periodScript = `local limit = tonumber(ARGV[1])
const (
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
periodScript = `local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call("INCRBY", KEYS[1], 1)
if current == 1 then
@@ -22,6 +23,8 @@ elseif current == limit then
else
return 0
end`
zoneDiff = 3600 * 8 // GMT+8 for our services
)
const (
// Unknown means not initialized state.
@@ -101,9 +104,7 @@ func (h *PeriodLimit) Take(key string) (int, error) {
func (h *PeriodLimit) calcExpireSeconds() int {
if h.align {
now := time.Now()
_, offset := now.Zone()
unix := now.Unix() + int64(offset)
unix := time.Now().Unix() + zoneDiff
return h.period - int(unix%int64(h.period))
}
@@ -111,8 +112,6 @@ func (h *PeriodLimit) calcExpireSeconds() int {
}
// Align returns a func to customize a PeriodLimit with alignment.
// For example, if we want to limit end users with 5 sms verification messages every day,
// we need to align with the local timezone and the start of the day.
func Align() PeriodOption {
return func(l *PeriodLimit) {
l.align = true

View File

@@ -5,8 +5,8 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
)
func TestPeriodLimit_Take(t *testing.T) {
@@ -23,9 +23,10 @@ func TestPeriodLimit_RedisUnavailable(t *testing.T) {
const (
seconds = 1
total = 100
quota = 5
)
l := NewPeriodLimit(seconds, quota, redis.New(s.Addr()), "periodlimit")
l := NewPeriodLimit(seconds, quota, redis.NewRedis(s.Addr(), redis.NodeType), "periodlimit")
s.Close()
val, err := l.Take("first")
assert.NotNil(t, err)

View File

@@ -7,8 +7,8 @@ import (
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/redis"
xrate "golang.org/x/time/rate"
)
@@ -85,8 +85,8 @@ func (lim *TokenLimiter) Allow() bool {
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
// Use this method if you intend to drop / skip events that exceed the rate rate.
// Otherwise use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n)
}
@@ -112,8 +112,7 @@ func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
// Lua boolean false -> r Nil bulk reply
if err == redis.Nil {
return false
}
if err != nil {
} else if err != nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)

View File

@@ -6,9 +6,9 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
)
func init() {

View File

@@ -7,11 +7,11 @@ import (
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
)
const (

View File

@@ -8,11 +8,11 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/syncx"
)
const (

View File

@@ -3,7 +3,7 @@ package load
import (
"io"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/syncx"
)
// A ShedderGroup is a manager to manage key based shedders.

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat"
)
type (

View File

@@ -3,11 +3,10 @@ package logx
// A LogConf is a logging config.
type LogConf struct {
ServiceName string `json:",optional"`
Mode string `json:",default=console,options=[console,file,volume]"`
Encoding string `json:",default=json,options=[json,plain]"`
Mode string `json:",default=console,options=console|file|volume"`
TimeFormat string `json:",optional"`
Path string `json:",default=logs"`
Level string `json:",default=info,options=[info,error,severe]"`
Level string `json:",default=info,options=info|error|severe"`
Compress bool `json:",optional"`
KeepDays int `json:",optional"`
StackCooldownMillis int `json:",default=100"`

View File

@@ -3,10 +3,9 @@ package logx
import (
"fmt"
"io"
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/timex"
)
const durationCallerDepth = 3
@@ -80,15 +79,10 @@ func (l *durationLogger) WithDuration(duration time.Duration) Logger {
}
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainAny(writer, level, val, l.Duration)
default:
outputJson(writer, &durationLogger{
Timestamp: getTimestamp(),
Level: level,
Content: val,
Duration: l.Duration,
})
}
outputJson(writer, &durationLogger{
Timestamp: getTimestamp(),
Level: level,
Content: val,
Duration: l.Duration,
})
}

View File

@@ -3,7 +3,6 @@ package logx
import (
"log"
"strings"
"sync/atomic"
"testing"
"time"
@@ -38,19 +37,6 @@ func TestWithDurationInfo(t *testing.T) {
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
}
func TestWithDurationInfoConsole(t *testing.T) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
var builder strings.Builder
log.SetOutput(&builder)
WithDuration(time.Second).Info("foo")
assert.True(t, strings.Contains(builder.String(), "ms"), builder.String())
}
func TestWithDurationInfof(t *testing.T) {
var builder strings.Builder
log.SetOutput(&builder)

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
)
type limitedExecutor struct {

View File

@@ -6,7 +6,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/timex"
)
func TestLimitedExecutor_logOrDiscard(t *testing.T) {

View File

@@ -1,7 +1,6 @@
package logx
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -18,9 +17,9 @@ import (
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/iox"
"github.com/zeromicro/go-zero/core/sysx"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/sysx"
"github.com/tal-tech/go-zero/core/timex"
)
const (
@@ -32,15 +31,6 @@ const (
SevereLevel
)
const (
jsonEncodingType = iota
plainEncodingType
jsonEncoding = "json"
plainEncoding = "plain"
plainEncodingSep = '\t'
)
const (
accessFilename = "access.log"
errorFilename = "error.log"
@@ -72,10 +62,9 @@ var (
// ErrLogServiceNameNotSet is an error that indicates that the service name is not set.
ErrLogServiceNameNotSet = errors.New("log service name must be set")
timeFormat = "2006-01-02T15:04:05.000Z07:00"
timeFormat = "2006-01-02T15:04:05.000Z07"
writeConsole bool
logLevel uint32
encoding uint32 = jsonEncodingType
// use uint32 for atomic operations
disableStat uint32
infoLog io.WriteCloser
@@ -135,12 +124,6 @@ func SetUp(c LogConf) error {
if len(c.TimeFormat) > 0 {
timeFormat = c.TimeFormat
}
switch c.Encoding {
case plainEncoding:
atomic.StoreUint32(&encoding, plainEncodingType)
default:
atomic.StoreUint32(&encoding, jsonEncodingType)
}
switch c.Mode {
case consoleMode:
@@ -424,31 +407,21 @@ func infoTextSync(msg string) {
}
func outputAny(writer io.Writer, level string, val interface{}) {
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainAny(writer, level, val)
default:
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: val,
}
outputJson(writer, info)
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: val,
}
outputJson(writer, info)
}
func outputText(writer io.Writer, level, msg string) {
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainText(writer, level, msg)
default:
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: msg,
}
outputJson(writer, info)
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: msg,
}
outputJson(writer, info)
}
func outputError(writer io.Writer, msg string, callDepth int) {
@@ -592,62 +565,6 @@ func statSync(msg string) {
}
}
func writePlainAny(writer io.Writer, level string, val interface{}, fields ...string) {
switch v := val.(type) {
case string:
writePlainText(writer, level, v, fields...)
case error:
writePlainText(writer, level, v.Error(), fields...)
case fmt.Stringer:
writePlainText(writer, level, v.String(), fields...)
default:
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
if err := json.NewEncoder(&buf).Encode(val); err != nil {
log.Println(err.Error())
return
}
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
}
func writePlainText(writer io.Writer, level, msg string, fields ...string) {
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
buf.WriteString(msg)
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
type logWriter struct {
logger *log.Logger
}

View File

@@ -141,78 +141,6 @@ func TestStructedLogInfov(t *testing.T) {
})
}
func TestStructedLogInfoConsoleAny(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(v)
})
}
func TestStructedLogInfoConsoleAnyString(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(fmt.Sprint(v...))
})
}
func TestStructedLogInfoConsoleAnyError(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(errors.New(fmt.Sprint(v...)))
})
}
func TestStructedLogInfoConsoleAnyStringer(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(ValStringer{
val: fmt.Sprint(v...),
})
})
}
func TestStructedLogInfoConsoleText(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Info(fmt.Sprint(v...))
})
}
func TestStructedLogSlow(t *testing.T) {
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
slowLog = writer
@@ -504,17 +432,6 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
assert.True(t, strings.Contains(val, message))
}
func doTestStructedLogConsole(t *testing.T, setup func(writer io.WriteCloser),
write func(...interface{})) {
const message = "hello there"
writer := new(mockWriter)
setup(writer)
atomic.StoreUint32(&initialized, 1)
write(message)
println(writer.String())
assert.True(t, strings.Contains(writer.String(), message))
}
func testSetLevelTwiceWithMode(t *testing.T, mode string) {
SetUp(LogConf{
Mode: mode,
@@ -539,11 +456,3 @@ func testSetLevelTwiceWithMode(t *testing.T, mode string) {
ErrorStackf(message)
assert.Equal(t, 0, writer.builder.Len())
}
type ValStringer struct {
val string
}
func (v ValStringer) String() string {
return v.val
}

View File

@@ -13,9 +13,9 @@ import (
"sync"
"time"
"github.com/zeromicro/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/timex"
)
const (

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/fs"
)
func TestDailyRotateRuleMarkRotated(t *testing.T) {

View File

@@ -29,9 +29,9 @@ func TestRedirector(t *testing.T) {
}
func captureOutput(f func()) string {
atomic.StoreUint32(&initialized, 1)
writer := new(mockWriter)
infoLog = writer
atomic.StoreUint32(&initialized, 1)
prevLevel := atomic.LoadUint32(&logLevel)
SetLevel(InfoLevel)
@@ -44,9 +44,5 @@ func captureOutput(f func()) string {
func getContent(jsonStr string) string {
var entry logEntry
json.Unmarshal([]byte(jsonStr), &entry)
val, ok := entry.Content.(string)
if ok {
return val
}
return ""
return entry.Content.(string)
}

View File

@@ -4,10 +4,9 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/timex"
"go.opentelemetry.io/otel/trace"
)
@@ -78,24 +77,16 @@ func (l *traceLogger) WithDuration(duration time.Duration) Logger {
}
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
traceID := traceIdFromContext(l.ctx)
spanID := spanIdFromContext(l.ctx)
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainAny(writer, level, val, l.Duration, traceID, spanID)
default:
outputJson(writer, &traceLogger{
logEntry: logEntry{
Timestamp: getTimestamp(),
Level: level,
Duration: l.Duration,
Content: val,
},
Trace: traceID,
Span: spanID,
})
}
outputJson(writer, &traceLogger{
logEntry: logEntry{
Timestamp: getTimestamp(),
Level: level,
Duration: l.Duration,
Content: val,
},
Trace: traceIdFromContext(l.ctx),
Span: spanIdFromContext(l.ctx),
})
}
// WithContext sets ctx to log, for keeping tracing information.

View File

@@ -82,37 +82,6 @@ func TestTraceInfo(t *testing.T) {
assert.True(t, strings.Contains(buf.String(), spanKey))
}
func TestTraceInfoConsole(t *testing.T) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, jsonEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
infoLog = newLogWriter(log.New(&buf, "", flags))
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infov(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
}
func TestTraceSlow(t *testing.T) {
var buf mockWriter
atomic.StoreUint32(&initialized, 1)

View File

@@ -3,7 +3,7 @@ package mapping
import (
"io"
"github.com/zeromicro/go-zero/core/jsonx"
"github.com/tal-tech/go-zero/core/jsonx"
)
const jsonTagKey = "json"

View File

@@ -9,9 +9,9 @@ import (
"sync"
"time"
"github.com/zeromicro/go-zero/core/jsonx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/jsonx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx"
)
const (
@@ -742,9 +742,7 @@ func getValueWithChainedKeys(m Valuer, keys []string) (interface{}, bool) {
if len(keys) == 1 {
v, ok := m.Value(keys[0])
return v, ok
}
if len(keys) > 1 {
} else if len(keys) > 1 {
if v, ok := m.Value(keys[0]); ok {
if nextm, ok := v.(map[string]interface{}); ok {
return getValueWithChainedKeys(MapValuer(nextm), keys[1:])

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/stringx"
)
// because json.Number doesn't support strconv.ParseUint(...),

View File

@@ -10,7 +10,7 @@ import (
"strings"
"sync"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/stringx"
)
const (

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"io"
"io/ioutil"
"gopkg.in/yaml.v2"
)
@@ -13,7 +14,7 @@ const yamlTagKey = "json"
var (
// ErrUnsupportedType is an error that indicates the config format is not supported.
ErrUnsupportedType = errors.New("only map-like configs are supported")
ErrUnsupportedType = errors.New("only map-like configs are suported")
yamlUnmarshaler = NewUnmarshaler(yamlTagKey)
)
@@ -28,6 +29,39 @@ func UnmarshalYamlReader(reader io.Reader, v interface{}) error {
return unmarshalYamlReader(reader, v, yamlUnmarshaler)
}
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
var o interface{}
if err := yamlUnmarshal(content, &o); err != nil {
return err
}
if m, ok := o.(map[string]interface{}); ok {
return unmarshaler.Unmarshal(m, v)
}
return ErrUnsupportedType
}
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
content, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
return unmarshalYamlBytes(content, v, unmarshaler)
}
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
func yamlUnmarshal(in []byte, out interface{}) error {
var res interface{}
if err := yaml.Unmarshal(in, &res); err != nil {
return err
}
*out.(*interface{}) = cleanupMapValue(res)
return nil
}
func cleanupInterfaceMap(in map[interface{}]interface{}) map[string]interface{} {
res := make(map[string]interface{})
for k, v := range in {
@@ -62,40 +96,3 @@ func cleanupMapValue(v interface{}) interface{} {
return Repr(v)
}
}
func unmarshal(unmarshaler *Unmarshaler, o interface{}, v interface{}) error {
if m, ok := o.(map[string]interface{}); ok {
return unmarshaler.Unmarshal(m, v)
}
return ErrUnsupportedType
}
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
var o interface{}
if err := yamlUnmarshal(content, &o); err != nil {
return err
}
return unmarshal(unmarshaler, o, v)
}
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
var res interface{}
if err := yaml.NewDecoder(reader).Decode(&res); err != nil {
return err
}
return unmarshal(unmarshaler, cleanupMapValue(res), v)
}
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
func yamlUnmarshal(in []byte, out interface{}) error {
var res interface{}
if err := yaml.Unmarshal(in, &res); err != nil {
return err
}
*out.(*interface{}) = cleanupMapValue(res)
return nil
}

View File

@@ -926,17 +926,14 @@ func TestUnmarshalYamlBytesError(t *testing.T) {
}
func TestUnmarshalYamlReaderError(t *testing.T) {
payload := `abcd: cdef`
reader := strings.NewReader(payload)
var v struct {
Any string
}
reader := strings.NewReader(`abcd: cdef`)
err := UnmarshalYamlReader(reader, &v)
assert.NotNil(t, err)
reader = strings.NewReader("chenquan")
err = UnmarshalYamlReader(reader, &v)
assert.ErrorIs(t, err, ErrUnsupportedType)
}
func TestUnmarshalYamlBadReader(t *testing.T) {
@@ -1014,6 +1011,6 @@ func TestUnmarshalYamlMapRune(t *testing.T) {
type badReader struct{}
func (b *badReader) Read(_ []byte) (n int, err error) {
func (b *badReader) Read(p []byte) (n int, err error) {
return 0, io.ErrLimitReached
}

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/stringx"
)
func TestMaxInt(t *testing.T) {

View File

@@ -2,7 +2,7 @@ package metric
import (
prom "github.com/prometheus/client_golang/prometheus"
"github.com/zeromicro/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/proc"
)
type (

View File

@@ -2,7 +2,7 @@ package metric
import (
prom "github.com/prometheus/client_golang/prometheus"
"github.com/zeromicro/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/proc"
)
type (

View File

@@ -2,7 +2,7 @@ package metric
import (
prom "github.com/prometheus/client_golang/prometheus"
"github.com/zeromicro/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/proc"
)
type (

View File

@@ -3,11 +3,12 @@ package mr
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/errorx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading"
)
const (
@@ -23,12 +24,12 @@ var (
)
type (
// ForEachFunc is used to do element processing, but no output.
ForEachFunc func(item interface{})
// GenerateFunc is used to let callers send elements into source.
GenerateFunc func(source chan<- interface{})
// MapFunc is used to do element processing and write the output to writer.
MapFunc func(item interface{}, writer Writer)
// VoidMapFunc is used to do element processing, but no output.
VoidMapFunc func(item interface{})
// MapperFunc is used to do element processing and write the output to writer,
// use cancel func to cancel the processing.
MapperFunc func(item interface{}, writer Writer, cancel func(error))
@@ -41,16 +42,6 @@ type (
// Option defines the method to customize the mapreduce.
Option func(opts *mapReduceOptions)
mapperContext struct {
ctx context.Context
mapper MapFunc
source <-chan interface{}
panicChan *onceChan
collector chan<- interface{}
doneChan <-chan lang.PlaceholderType
workers int
}
mapReduceOptions struct {
ctx context.Context
workers int
@@ -78,6 +69,7 @@ func Finish(fns ...func() error) error {
cancel(err)
}
}, func(pipe <-chan interface{}, cancel func(error)) {
drain(pipe)
}, WithWorkers(len(fns)))
}
@@ -87,7 +79,7 @@ func FinishVoid(fns ...func()) {
return
}
ForEach(func(source chan<- interface{}) {
MapVoid(func(source chan<- interface{}) {
for _, fn := range fns {
source <- fn
}
@@ -97,74 +89,41 @@ func FinishVoid(fns ...func()) {
}, WithWorkers(len(fns)))
}
// ForEach maps all elements from given generate but no output.
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
// Map maps all elements generated from given generate func, and returns an output channel.
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
options := buildOptions(opts...)
panicChan := &onceChan{channel: make(chan interface{})}
source := buildSource(generate, panicChan)
source := buildSource(generate)
collector := make(chan interface{}, options.workers)
done := make(chan lang.PlaceholderType)
go executeMappers(mapperContext{
ctx: options.ctx,
mapper: func(item interface{}, writer Writer) {
mapper(item)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
for {
select {
case v := <-panicChan.channel:
panic(v)
case _, ok := <-collector:
if !ok {
return
}
}
}
return collector
}
// MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer.
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
source := buildSource(generate, panicChan)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
source := buildSource(generate)
return MapReduceWithSource(source, mapper, reducer, opts...)
}
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
reducer ReducerFunc, opts ...Option) (interface{}, error) {
options := buildOptions(opts...)
// output is used to write the final result
output := make(chan interface{})
defer func() {
// reducer can only write once, if more, panic
for range output {
panic("more than one element written in reducer")
}
}()
// collector is used to collect data from mapper, and consume in reducer
collector := make(chan interface{}, options.workers)
// if done is closed, all mappers and reducer should stop processing
done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once
// use atomic.Value to avoid data race
var retErr errorx.AtomicError
finish := func() {
closeOnce.Do(func() {
@@ -186,41 +145,28 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapp
go func() {
defer func() {
drain(collector)
if r := recover(); r != nil {
panicChan.write(r)
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
finish()
}()
reducer(collector, writer, cancel)
}()
go executeMappers(mapperContext{
ctx: options.ctx,
mapper: func(item interface{}, w Writer) {
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
go executeMappers(options.ctx, func(item interface{}, w Writer) {
mapper(item, w, cancel)
}, source, collector, done, options.workers)
select {
case <-options.ctx.Done():
cancel(context.DeadlineExceeded)
return nil, context.DeadlineExceeded
case v := <-panicChan.channel:
panic(v)
case v, ok := <-output:
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return v, nil
} else {
return nil, ErrReduceNoOutput
}
value, ok := <-output
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
}
}
@@ -229,14 +175,20 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapp
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
if errors.Is(err, ErrReduceNoOutput) {
return nil
}
return err
}
// MapVoid maps all elements from given generate but no output.
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
drain(Map(generate, func(item interface{}, writer Writer) {
mapper(item)
}, opts...))
}
// WithContext customizes a mapreduce processing accepts a given ctx.
func WithContext(ctx context.Context) Option {
return func(opts *mapReduceOptions) {
@@ -264,18 +216,12 @@ func buildOptions(opts ...Option) *mapReduceOptions {
return options
}
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})
go func() {
defer func() {
if r := recover(); r != nil {
panicChan.write(r)
}
close(source)
}()
threading.GoSafe(func() {
defer close(source)
generate(source)
}()
})
return source
}
@@ -287,43 +233,39 @@ func drain(channel <-chan interface{}) {
}
}
func executeMappers(mCtx mapperContext) {
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(mCtx.collector)
drain(mCtx.source)
close(collector)
}()
var failed int32
pool := make(chan lang.PlaceholderType, mCtx.workers)
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
for atomic.LoadInt32(&failed) == 0 {
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(ctx, collector, done)
for {
select {
case <-mCtx.ctx.Done():
case <-ctx.Done():
return
case <-mCtx.doneChan:
case <-done:
return
case pool <- lang.Placeholder:
item, ok := <-mCtx.source
item, ok := <-input
if !ok {
<-pool
return
}
wg.Add(1)
go func() {
// better to safely run caller defined method
threading.GoSafe(func() {
defer func() {
if r := recover(); r != nil {
atomic.AddInt32(&failed, 1)
mCtx.panicChan.write(r)
}
wg.Done()
<-pool
}()
mCtx.mapper(item, writer)
}()
mapper(item, writer)
})
}
}
}
@@ -369,16 +311,3 @@ func (gw guardedWriter) Write(v interface{}) {
gw.channel <- v
}
}
type onceChan struct {
channel chan interface{}
wrote int32
}
func (oc *onceChan) write(val interface{}) {
if atomic.AddInt32(&oc.wrote, 1) > 1 {
return
}
oc.channel <- val
}

View File

@@ -1,78 +0,0 @@
//go:build go1.18
// +build go1.18
package mr
import (
"fmt"
"math/rand"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func FuzzMapReduce(f *testing.F) {
rand.Seed(time.Now().UnixNano())
f.Add(uint(10), uint(runtime.NumCPU()))
f.Fuzz(func(t *testing.T, num uint, workers uint) {
n := int64(num)%5000 + 5000
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
})
}

View File

@@ -1,107 +0,0 @@
//go:build fuzz
// +build fuzz
package mr
import (
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
// so we need to simulate the fuzz test in test mode.
func TestMapReduceRandom(t *testing.T) {
rand.Seed(time.Now().UnixNano())
const (
times = 10000
nRange = 500
mega = 1024 * 1024
)
bar := pb.New(times).Start()
runner := threading.NewTaskRunner(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
runner.Schedule(func() {
start := time.Now()
defer func() {
if time.Since(start) > time.Minute {
t.Fatal("timeout")
}
wg.Done()
}()
t.Run(strconv.Itoa(i), func(t *testing.T) {
n := rand.Int63n(nRange)%nRange + nRange
workers := rand.Int()%50 + runtime.NumCPU()/2
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
bar.Increment()
})
})
}
wg.Wait()
bar.Finish()
}

View File

@@ -11,7 +11,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx"
)
var errDummy = errors.New("dummy")
@@ -21,8 +22,6 @@ func init() {
}
func TestFinish(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32
err := Finish(func() error {
atomic.AddUint32(&total, 2)
@@ -40,20 +39,14 @@ func TestFinish(t *testing.T) {
}
func TestFinishNone(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Nil(t, Finish())
}
func TestFinishVoidNone(t *testing.T) {
defer goleak.VerifyNone(t)
FinishVoid()
}
func TestFinishErr(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32
err := Finish(func() error {
atomic.AddUint32(&total, 2)
@@ -70,8 +63,6 @@ func TestFinishErr(t *testing.T) {
}
func TestFinishVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32
FinishVoid(func() {
atomic.AddUint32(&total, 2)
@@ -84,107 +75,70 @@ func TestFinishVoid(t *testing.T) {
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
}
func TestForEach(t *testing.T) {
const tasks = 1000
func TestMap(t *testing.T) {
tests := []struct {
mapper MapFunc
expect int
}{
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
writer.Write(v * v)
},
expect: 30,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
return
}
writer.Write(v * v)
},
expect: 10,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
panic(v)
}
writer.Write(v * v)
},
expect: 10,
},
}
t.Run("all", func(t *testing.T) {
defer goleak.VerifyNone(t)
var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
}, WithWorkers(-1))
assert.Equal(t, tasks, int(count))
})
t.Run("odd", func(t *testing.T) {
defer goleak.VerifyNone(t)
var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
if item.(int)%2 == 0 {
atomic.AddUint32(&count, 1)
}
})
assert.Equal(t, tasks/2, int(count))
})
t.Run("all", func(t *testing.T) {
defer goleak.VerifyNone(t)
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
for _, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) {
channel := Map(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}, func(item interface{}) {
panic("foo")
})
}, test.mapper, WithWorkers(-1))
var result int
for v := range channel {
result += v.(int)
}
assert.Equal(t, test.expect, result)
})
})
}
func TestGeneratePanic(t *testing.T) {
defer goleak.VerifyNone(t)
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
panic("foo")
}, func(item interface{}) {
})
})
})
}
func TestMapperPanic(t *testing.T) {
defer goleak.VerifyNone(t)
const tasks = 1000
var run int32
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
atomic.AddInt32(&run, 1)
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
})
})
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
})
}
}
func TestMapReduce(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct {
name string
mapper MapperFunc
reducer ReducerFunc
expectErr error
expectValue interface{}
}{
{
name: "simple",
expectErr: nil,
expectValue: 30,
},
{
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -195,7 +149,6 @@ func TestMapReduce(t *testing.T) {
expectErr: errDummy,
},
{
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -207,7 +160,6 @@ func TestMapReduce(t *testing.T) {
expectValue: nil,
},
{
name: "cancel with more",
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
@@ -222,74 +174,36 @@ func TestMapReduce(t *testing.T) {
},
}
t.Run("MapReduce", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
for _, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
value, err := MapReduce(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
})
t.Run("MapReduce", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
}
value, err := MapReduce(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
source := make(chan interface{})
go func() {
for i := 1; i < 5; i++ {
source <- i
}
close(source)
}()
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
})
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
}
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
MapReduce(func(source chan<- interface{}) {
for i := 0; i < 10; i++ {
@@ -306,23 +220,18 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
}
func TestMapReduceVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var value uint32
tests := []struct {
name string
mapper MapperFunc
reducer VoidReducerFunc
expectValue uint32
expectErr error
}{
{
name: "simple",
expectValue: 30,
expectErr: nil,
},
{
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -333,7 +242,6 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: errDummy,
},
{
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -344,7 +252,6 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: ErrCancelWithNil,
},
{
name: "cancel with more",
reducer: func(pipe <-chan interface{}, cancel func(error)) {
for item := range pipe {
result := atomic.AddUint32(&value, uint32(item.(int)))
@@ -358,7 +265,7 @@ func TestMapReduceVoid(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Run(stringx.Rand(), func(t *testing.T) {
atomic.StoreUint32(&value, 0)
if test.mapper == nil {
@@ -389,8 +296,6 @@ func TestMapReduceVoid(t *testing.T) {
}
func TestMapReduceVoidWithDelay(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int
err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0
@@ -413,64 +318,38 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
assert.Equal(t, 0, result[1])
}
func TestMapVoid(t *testing.T) {
const tasks = 1000
var count uint32
MapVoid(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
})
assert.Equal(t, tasks, int(count))
}
func TestMapReducePanic(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("panic")
}
})
})
}
func TestMapReducePanicOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < 100; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
if i == 0 {
panic("foo")
}
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("bar")
}
})
})
}
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
panic("bar")
})
v, err := MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("panic")
}
})
assert.Nil(t, v)
assert.NotNil(t, err)
assert.Equal(t, "panic", err.Error())
}
func TestMapReduceVoidCancel(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int
err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0
@@ -492,15 +371,13 @@ func TestMapReduceVoidCancel(t *testing.T) {
}
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
defer goleak.VerifyNone(t)
var done int32
var done syncx.AtomicBool
var result []int
err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
done.Set(true)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
if i == defaultWorkers/2 {
@@ -515,12 +392,10 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
})
assert.NotNil(t, err)
assert.Equal(t, "anything", err.Error())
assert.Equal(t, int32(1), done)
assert.True(t, done.True())
}
func TestMapReduceWithoutReducerWrite(t *testing.T) {
defer goleak.VerifyNone(t)
uids := []int{1, 2, 3}
res, err := MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
@@ -537,54 +412,33 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
}
func TestMapReduceVoidPanicInReducer(t *testing.T) {
defer goleak.VerifyNone(t)
const message = "foo"
assert.Panics(t, func() {
var done int32
_ = MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, cancel func(error)) {
panic(message)
}, WithWorkers(1))
})
}
func TestForEachWithContext(t *testing.T) {
defer goleak.VerifyNone(t)
var done int32
ctx, cancel := context.WithCancel(context.Background())
ForEach(func(source chan<- interface{}) {
var done syncx.AtomicBool
err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
}, func(item interface{}) {
done.Set(true)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
if i == defaultWorkers/2 {
cancel()
}
}, WithContext(ctx))
writer.Write(i)
}, func(pipe <-chan interface{}, cancel func(error)) {
panic(message)
}, WithWorkers(1))
assert.NotNil(t, err)
assert.Equal(t, message, err.Error())
assert.True(t, done.True())
}
func TestMapReduceWithContext(t *testing.T) {
defer goleak.VerifyNone(t)
var done int32
var done syncx.AtomicBool
var result []int
ctx, cancel := context.WithCancel(context.Background())
err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
done.Set(true)
}, func(item interface{}, writer Writer, c func(error)) {
i := item.(int)
if i == defaultWorkers/2 {
@@ -598,7 +452,7 @@ func TestMapReduceWithContext(t *testing.T) {
}
}, WithContext(ctx))
assert.NotNil(t, err)
assert.Equal(t, context.DeadlineExceeded, err)
assert.Equal(t, ErrReduceNoOutput, err)
}
func BenchmarkMapReduce(b *testing.B) {

View File

@@ -54,7 +54,7 @@ import (
"fmt"
"log"
"github.com/zeromicro/go-zero/core/mr"
"github.com/tal-tech/go-zero/core/mr"
)
func main() {

View File

@@ -55,7 +55,7 @@ import (
"fmt"
"log"
"github.com/zeromicro/go-zero/core/mr"
"github.com/tal-tech/go-zero/core/mr"
)
func main() {
@@ -87,4 +87,4 @@ More examples: [https://github.com/zeromicro/zero-examples/tree/main/mapreduce](
## Give a Star! ⭐
If you like or are using this project to learn or start your solution, please give it a star. Thanks!
If you like or are using this project to learn or start your solution, please give it a star. Thanks!

View File

@@ -11,7 +11,7 @@ import (
"syscall"
"time"
"github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/logx"
)
const (

View File

@@ -15,7 +15,7 @@ import (
"syscall"
"time"
"github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/logx"
)
// DefaultMemProfileRate is the default memory profiling rate.

View File

@@ -10,8 +10,8 @@ import (
"syscall"
"time"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/threading"
)
const (

View File

@@ -1,6 +1,3 @@
//go:build linux || darwin
// +build linux darwin
package proc
import (

Some files were not shown because too many files have changed in this diff Show More