Compare commits
41 Commits
tools/goct
...
tools/goct
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f46eab977 | ||
|
|
ec299085f5 | ||
|
|
7727d70634 | ||
|
|
5f9d101bc6 | ||
|
|
6c2abe7474 | ||
|
|
14a902c1a7 | ||
|
|
5ad6a6d229 | ||
|
|
6f4b97864a | ||
|
|
0e0abc3a95 | ||
|
|
696fda1db4 | ||
|
|
c1d2634427 | ||
|
|
4b7a680ac5 | ||
|
|
b3e7d2901f | ||
|
|
cdf7ec213c | ||
|
|
f1102fb262 | ||
|
|
09d1fad6e0 | ||
|
|
379c65a3ef | ||
|
|
fdc7f64d6f | ||
|
|
df0f8ed59e | ||
|
|
c903966fc7 | ||
|
|
e57fa8ff53 | ||
|
|
bf2feee5b7 | ||
|
|
ce05c429fc | ||
|
|
272a3f347d | ||
|
|
13db7a1931 | ||
|
|
468c237189 | ||
|
|
b9b80c068b | ||
|
|
9b592b3dee | ||
|
|
2203809e5e | ||
|
|
8d6d37f71e | ||
|
|
ea4f2af67f | ||
|
|
53af194ef9 | ||
|
|
5e0e2d2b14 | ||
|
|
74c99184c5 | ||
|
|
eb4b86137a | ||
|
|
9c4f4f3b4e | ||
|
|
240132e7c7 | ||
|
|
9d67fc4cfb | ||
|
|
892f93a716 | ||
|
|
ba6a7c9dc8 | ||
|
|
a91c3907a8 |
2
.github/workflows/go.yml
vendored
2
.github/workflows/go.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
||||
- name: Set up Go 1.x
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ^1.14
|
||||
go-version: ^1.15
|
||||
id: go
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
|
||||
18
.github/workflows/issue-translator.yml
vendored
Normal file
18
.github/workflows/issue-translator.yml
vendored
Normal file
@@ -0,0 +1,18 @@
|
||||
name: 'issue-translator'
|
||||
on:
|
||||
issue_comment:
|
||||
types: [created]
|
||||
issues:
|
||||
types: [opened]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: tomsun28/issues-translate-action@v2.6
|
||||
with:
|
||||
IS_MODIFY_TITLE: true
|
||||
# not require, default false, . Decide whether to modify the issue title
|
||||
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
|
||||
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑🤝🧑👫🧑🏿🤝🧑🏻👩🏾🤝👨🏿👬🏿
|
||||
# not require. Customize the translation robot prefix message.
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -16,7 +16,8 @@
|
||||
**/logs
|
||||
|
||||
# for test purpose
|
||||
adhoc
|
||||
**/adhoc
|
||||
**/testdata
|
||||
|
||||
# gitlab ci
|
||||
.cache
|
||||
|
||||
@@ -40,7 +40,7 @@ We will help you to contribute in different areas like filing issues, developing
|
||||
getting your work reviewed and merged.
|
||||
|
||||
If you have questions about the development process,
|
||||
feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose).
|
||||
feel free to [file an issue](https://github.com/zeromicro/go-zero/issues/new/choose).
|
||||
|
||||
## Find something to work on
|
||||
|
||||
@@ -50,10 +50,10 @@ Here is how you get started.
|
||||
|
||||
### Find a good first topic
|
||||
|
||||
[go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue.
|
||||
For example, [go-zero](https://github.com/tal-tech/go-zero) has
|
||||
[help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
|
||||
[good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
|
||||
[go-zero](https://github.com/zeromicro/go-zero) has beginner-friendly issues that provide a good first issue.
|
||||
For example, [go-zero](https://github.com/zeromicro/go-zero) has
|
||||
[help wanted](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
|
||||
[good first issue](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
|
||||
labels for issues that should not need deep knowledge of the system.
|
||||
We can help new contributors who wish to work on such issues.
|
||||
|
||||
@@ -79,7 +79,7 @@ This is a rough outline of what a contributor's workflow looks like:
|
||||
- Create a topic branch from where to base the contribution. This is usually master.
|
||||
- Make commits of logical units.
|
||||
- Push changes in a topic branch to a personal fork of the repository.
|
||||
- Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero).
|
||||
- Submit a pull request to [go-zero](https://github.com/zeromicro/go-zero).
|
||||
|
||||
## Creating Pull Requests
|
||||
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/hash"
|
||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
||||
"github.com/zeromicro/go-zero/core/hash"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||
)
|
||||
|
||||
func TestRedisBitSet_New_Set_Test(t *testing.T) {
|
||||
|
||||
@@ -6,11 +6,11 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
"github.com/tal-tech/go-zero/core/proc"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/mathx"
|
||||
"github.com/zeromicro/go-zero/core/proc"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/collection"
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
"github.com/zeromicro/go-zero/core/collection"
|
||||
"github.com/zeromicro/go-zero/core/mathx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -7,9 +7,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/collection"
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/collection"
|
||||
"github.com/zeromicro/go-zero/core/mathx"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/iox"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/iox"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
)
|
||||
|
||||
func TestEnterToContinue(t *testing.T) {
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
// ErrPaddingSize indicates bad padding size.
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/mathx"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
const duration = time.Millisecond * 50
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
func TestSafeMap(t *testing.T) {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package collection
|
||||
|
||||
import (
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -5,9 +5,9 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const drainWorkers = 8
|
||||
|
||||
@@ -8,10 +8,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mapping"
|
||||
"github.com/zeromicro/go-zero/core/mapping"
|
||||
)
|
||||
|
||||
var loaders = map[string]func([]byte, interface{}) error{
|
||||
|
||||
@@ -6,8 +6,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/tal-tech/go-zero/core/hash"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/hash"
|
||||
)
|
||||
|
||||
func TestLoadConfig_notExists(t *testing.T) {
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/iox"
|
||||
"github.com/zeromicro/go-zero/core/iox"
|
||||
)
|
||||
|
||||
// PropertyError represents a configuration error message.
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
)
|
||||
|
||||
func TestProperties(t *testing.T) {
|
||||
|
||||
@@ -3,7 +3,7 @@ package contextx
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mapping"
|
||||
"github.com/zeromicro/go-zero/core/mapping"
|
||||
)
|
||||
|
||||
const contextTagKey = "ctx"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package discov
|
||||
|
||||
import "github.com/tal-tech/go-zero/core/discov/internal"
|
||||
import "github.com/zeromicro/go-zero/core/discov/internal"
|
||||
|
||||
// RegisterAccount registers the username/password to the given etcd cluster.
|
||||
func RegisterAccount(endpoints []string, user, pass string) {
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
func TestRegisterAccount(t *testing.T) {
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
||||
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
||||
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||
)
|
||||
|
||||
var mockLock sync.Mutex
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
func TestAccount(t *testing.T) {
|
||||
|
||||
@@ -9,11 +9,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/contextx"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/contextx"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
|
||||
@@ -7,10 +7,10 @@ import (
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/contextx"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/contextx"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
package discov
|
||||
|
||||
import (
|
||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/proc"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/proc"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
|
||||
@@ -8,10 +8,10 @@ import (
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
|
||||
@@ -4,9 +4,9 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -5,8 +5,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
)
|
||||
|
||||
// A DelayExecutor delays a tasks on given delay interval.
|
||||
|
||||
@@ -3,8 +3,8 @@ package executors
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
// A LessExecutor is an executor to limit execution once within given time interval.
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
func TestLessExecutor_DoOrDiscard(t *testing.T) {
|
||||
|
||||
@@ -6,11 +6,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/proc"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/proc"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const idleRound = 10
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const threshold = 10
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
)
|
||||
|
||||
func TestSplitLineChunks(t *testing.T) {
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
)
|
||||
|
||||
func TestRangeReader(t *testing.T) {
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/hash"
|
||||
"github.com/zeromicro/go-zero/core/hash"
|
||||
)
|
||||
|
||||
// TempFileWithText creates the temporary file with the given content,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package fx
|
||||
|
||||
import "github.com/tal-tech/go-zero/core/threading"
|
||||
import "github.com/zeromicro/go-zero/core/threading"
|
||||
|
||||
// Parallel runs fns parallelly and waits for done.
|
||||
func Parallel(fns ...func()) {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package fx
|
||||
|
||||
import "github.com/tal-tech/go-zero/core/errorx"
|
||||
import "github.com/zeromicro/go-zero/core/errorx"
|
||||
|
||||
const defaultRetryTimes = 3
|
||||
|
||||
|
||||
@@ -4,9 +4,9 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/collection"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/collection"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -13,7 +13,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func TestBuffer(t *testing.T) {
|
||||
@@ -563,9 +564,6 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
|
||||
}
|
||||
|
||||
func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
|
||||
goroutines := runtime.NumGoroutine()
|
||||
defer goleak.VerifyNone(t)
|
||||
fn(t)
|
||||
// let scheduler schedule first
|
||||
time.Sleep(time.Millisecond)
|
||||
assert.True(t, runtime.NumGoroutine() <= goroutines)
|
||||
}
|
||||
|
||||
@@ -6,8 +6,8 @@ import (
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/mapping"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/mapping"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
"github.com/zeromicro/go-zero/core/mathx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
func TestReadText(t *testing.T) {
|
||||
|
||||
@@ -5,12 +5,11 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||
)
|
||||
|
||||
const (
|
||||
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
|
||||
periodScript = `local limit = tonumber(ARGV[1])
|
||||
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
|
||||
const periodScript = `local limit = tonumber(ARGV[1])
|
||||
local window = tonumber(ARGV[2])
|
||||
local current = redis.call("INCRBY", KEYS[1], 1)
|
||||
if current == 1 then
|
||||
@@ -23,8 +22,6 @@ elseif current == limit then
|
||||
else
|
||||
return 0
|
||||
end`
|
||||
zoneDiff = 3600 * 8 // GMT+8 for our services
|
||||
)
|
||||
|
||||
const (
|
||||
// Unknown means not initialized state.
|
||||
@@ -104,7 +101,9 @@ func (h *PeriodLimit) Take(key string) (int, error) {
|
||||
|
||||
func (h *PeriodLimit) calcExpireSeconds() int {
|
||||
if h.align {
|
||||
unix := time.Now().Unix() + zoneDiff
|
||||
now := time.Now()
|
||||
_, offset := now.Zone()
|
||||
unix := now.Unix() + int64(offset)
|
||||
return h.period - int(unix%int64(h.period))
|
||||
}
|
||||
|
||||
@@ -112,6 +111,8 @@ func (h *PeriodLimit) calcExpireSeconds() int {
|
||||
}
|
||||
|
||||
// Align returns a func to customize a PeriodLimit with alignment.
|
||||
// For example, if we want to limit end users with 5 sms verification messages every day,
|
||||
// we need to align with the local timezone and the start of the day.
|
||||
func Align() PeriodOption {
|
||||
return func(l *PeriodLimit) {
|
||||
l.align = true
|
||||
|
||||
@@ -5,8 +5,8 @@ import (
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||
)
|
||||
|
||||
func TestPeriodLimit_Take(t *testing.T) {
|
||||
|
||||
@@ -7,8 +7,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||
xrate "golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
@@ -85,8 +85,8 @@ func (lim *TokenLimiter) Allow() bool {
|
||||
}
|
||||
|
||||
// AllowN reports whether n events may happen at time now.
|
||||
// Use this method if you intend to drop / skip events that exceed the rate rate.
|
||||
// Otherwise use Reserve or Wait.
|
||||
// Use this method if you intend to drop / skip events that exceed the rate.
|
||||
// Otherwise, use Reserve or Wait.
|
||||
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
|
||||
return lim.reserveN(now, n)
|
||||
}
|
||||
@@ -112,7 +112,8 @@ func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
|
||||
// Lua boolean false -> r Nil bulk reply
|
||||
if err == redis.Nil {
|
||||
return false
|
||||
} else if err != nil {
|
||||
}
|
||||
if err != nil {
|
||||
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
|
||||
lim.startMonitor()
|
||||
return lim.rescueLimiter.AllowN(now, n)
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -7,11 +7,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/collection"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/collection"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,11 +8,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/collection"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/collection"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/mathx"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -3,7 +3,7 @@ package load
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
)
|
||||
|
||||
// A ShedderGroup is a manager to manage key based shedders.
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -3,10 +3,11 @@ package logx
|
||||
// A LogConf is a logging config.
|
||||
type LogConf struct {
|
||||
ServiceName string `json:",optional"`
|
||||
Mode string `json:",default=console,options=console|file|volume"`
|
||||
Mode string `json:",default=console,options=[console,file,volume]"`
|
||||
Encoding string `json:",default=json,options=[json,plain]"`
|
||||
TimeFormat string `json:",optional"`
|
||||
Path string `json:",default=logs"`
|
||||
Level string `json:",default=info,options=info|error|severe"`
|
||||
Level string `json:",default=info,options=[info,error,severe]"`
|
||||
Compress bool `json:",optional"`
|
||||
KeepDays int `json:",optional"`
|
||||
StackCooldownMillis int `json:",default=100"`
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const durationCallerDepth = 3
|
||||
@@ -79,10 +79,15 @@ func (l *durationLogger) WithDuration(duration time.Duration) Logger {
|
||||
}
|
||||
|
||||
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
|
||||
outputJson(writer, &durationLogger{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Content: val,
|
||||
Duration: l.Duration,
|
||||
})
|
||||
switch encoding {
|
||||
case plainEncodingType:
|
||||
writePlainAny(writer, level, val, l.Duration)
|
||||
default:
|
||||
outputJson(writer, &durationLogger{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Content: val,
|
||||
Duration: l.Duration,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,19 @@ func TestWithDurationInfo(t *testing.T) {
|
||||
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
|
||||
}
|
||||
|
||||
func TestWithDurationInfoConsole(t *testing.T) {
|
||||
old := encoding
|
||||
encoding = plainEncodingType
|
||||
defer func() {
|
||||
encoding = old
|
||||
}()
|
||||
|
||||
var builder strings.Builder
|
||||
log.SetOutput(&builder)
|
||||
WithDuration(time.Second).Info("foo")
|
||||
assert.True(t, strings.Contains(builder.String(), "ms"), builder.String())
|
||||
}
|
||||
|
||||
func TestWithDurationInfof(t *testing.T) {
|
||||
var builder strings.Builder
|
||||
log.SetOutput(&builder)
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
type limitedExecutor struct {
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
func TestLimitedExecutor_logOrDiscard(t *testing.T) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package logx
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -17,9 +18,9 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/iox"
|
||||
"github.com/tal-tech/go-zero/core/sysx"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/iox"
|
||||
"github.com/zeromicro/go-zero/core/sysx"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -31,6 +32,15 @@ const (
|
||||
SevereLevel
|
||||
)
|
||||
|
||||
const (
|
||||
jsonEncodingType = iota
|
||||
plainEncodingType
|
||||
|
||||
jsonEncoding = "json"
|
||||
plainEncoding = "plain"
|
||||
plainEncodingSep = '\t'
|
||||
)
|
||||
|
||||
const (
|
||||
accessFilename = "access.log"
|
||||
errorFilename = "error.log"
|
||||
@@ -65,6 +75,7 @@ var (
|
||||
timeFormat = "2006-01-02T15:04:05.000Z07"
|
||||
writeConsole bool
|
||||
logLevel uint32
|
||||
encoding = jsonEncodingType
|
||||
// use uint32 for atomic operations
|
||||
disableStat uint32
|
||||
infoLog io.WriteCloser
|
||||
@@ -124,6 +135,12 @@ func SetUp(c LogConf) error {
|
||||
if len(c.TimeFormat) > 0 {
|
||||
timeFormat = c.TimeFormat
|
||||
}
|
||||
switch c.Encoding {
|
||||
case plainEncoding:
|
||||
encoding = plainEncodingType
|
||||
default:
|
||||
encoding = jsonEncodingType
|
||||
}
|
||||
|
||||
switch c.Mode {
|
||||
case consoleMode:
|
||||
@@ -407,21 +424,31 @@ func infoTextSync(msg string) {
|
||||
}
|
||||
|
||||
func outputAny(writer io.Writer, level string, val interface{}) {
|
||||
info := logEntry{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Content: val,
|
||||
switch encoding {
|
||||
case plainEncodingType:
|
||||
writePlainAny(writer, level, val)
|
||||
default:
|
||||
info := logEntry{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Content: val,
|
||||
}
|
||||
outputJson(writer, info)
|
||||
}
|
||||
outputJson(writer, info)
|
||||
}
|
||||
|
||||
func outputText(writer io.Writer, level, msg string) {
|
||||
info := logEntry{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Content: msg,
|
||||
switch encoding {
|
||||
case plainEncodingType:
|
||||
writePlainText(writer, level, msg)
|
||||
default:
|
||||
info := logEntry{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Content: msg,
|
||||
}
|
||||
outputJson(writer, info)
|
||||
}
|
||||
outputJson(writer, info)
|
||||
}
|
||||
|
||||
func outputError(writer io.Writer, msg string, callDepth int) {
|
||||
@@ -565,6 +592,62 @@ func statSync(msg string) {
|
||||
}
|
||||
}
|
||||
|
||||
func writePlainAny(writer io.Writer, level string, val interface{}, fields ...string) {
|
||||
switch v := val.(type) {
|
||||
case string:
|
||||
writePlainText(writer, level, v, fields...)
|
||||
case error:
|
||||
writePlainText(writer, level, v.Error(), fields...)
|
||||
case fmt.Stringer:
|
||||
writePlainText(writer, level, v.String(), fields...)
|
||||
default:
|
||||
var buf bytes.Buffer
|
||||
buf.WriteString(getTimestamp())
|
||||
buf.WriteByte(plainEncodingSep)
|
||||
buf.WriteString(level)
|
||||
for _, item := range fields {
|
||||
buf.WriteByte(plainEncodingSep)
|
||||
buf.WriteString(item)
|
||||
}
|
||||
buf.WriteByte(plainEncodingSep)
|
||||
if err := json.NewEncoder(&buf).Encode(val); err != nil {
|
||||
log.Println(err.Error())
|
||||
return
|
||||
}
|
||||
buf.WriteByte('\n')
|
||||
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
|
||||
log.Println(buf.String())
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := writer.Write(buf.Bytes()); err != nil {
|
||||
log.Println(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writePlainText(writer io.Writer, level, msg string, fields ...string) {
|
||||
var buf bytes.Buffer
|
||||
buf.WriteString(getTimestamp())
|
||||
buf.WriteByte(plainEncodingSep)
|
||||
buf.WriteString(level)
|
||||
for _, item := range fields {
|
||||
buf.WriteByte(plainEncodingSep)
|
||||
buf.WriteString(item)
|
||||
}
|
||||
buf.WriteByte(plainEncodingSep)
|
||||
buf.WriteString(msg)
|
||||
buf.WriteByte('\n')
|
||||
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
|
||||
log.Println(buf.String())
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := writer.Write(buf.Bytes()); err != nil {
|
||||
log.Println(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
type logWriter struct {
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
@@ -141,6 +141,78 @@ func TestStructedLogInfov(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestStructedLogInfoConsoleAny(t *testing.T) {
|
||||
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||
infoLog = writer
|
||||
}, func(v ...interface{}) {
|
||||
old := encoding
|
||||
encoding = plainEncodingType
|
||||
defer func() {
|
||||
encoding = old
|
||||
}()
|
||||
|
||||
Infov(v)
|
||||
})
|
||||
}
|
||||
|
||||
func TestStructedLogInfoConsoleAnyString(t *testing.T) {
|
||||
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||
infoLog = writer
|
||||
}, func(v ...interface{}) {
|
||||
old := encoding
|
||||
encoding = plainEncodingType
|
||||
defer func() {
|
||||
encoding = old
|
||||
}()
|
||||
|
||||
Infov(fmt.Sprint(v...))
|
||||
})
|
||||
}
|
||||
|
||||
func TestStructedLogInfoConsoleAnyError(t *testing.T) {
|
||||
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||
infoLog = writer
|
||||
}, func(v ...interface{}) {
|
||||
old := encoding
|
||||
encoding = plainEncodingType
|
||||
defer func() {
|
||||
encoding = old
|
||||
}()
|
||||
|
||||
Infov(errors.New(fmt.Sprint(v...)))
|
||||
})
|
||||
}
|
||||
|
||||
func TestStructedLogInfoConsoleAnyStringer(t *testing.T) {
|
||||
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||
infoLog = writer
|
||||
}, func(v ...interface{}) {
|
||||
old := encoding
|
||||
encoding = plainEncodingType
|
||||
defer func() {
|
||||
encoding = old
|
||||
}()
|
||||
|
||||
Infov(ValStringer{
|
||||
val: fmt.Sprint(v...),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStructedLogInfoConsoleText(t *testing.T) {
|
||||
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||
infoLog = writer
|
||||
}, func(v ...interface{}) {
|
||||
old := encoding
|
||||
encoding = plainEncodingType
|
||||
defer func() {
|
||||
encoding = old
|
||||
}()
|
||||
|
||||
Info(fmt.Sprint(v...))
|
||||
})
|
||||
}
|
||||
|
||||
func TestStructedLogSlow(t *testing.T) {
|
||||
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
|
||||
slowLog = writer
|
||||
@@ -432,6 +504,17 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
|
||||
assert.True(t, strings.Contains(val, message))
|
||||
}
|
||||
|
||||
func doTestStructedLogConsole(t *testing.T, setup func(writer io.WriteCloser),
|
||||
write func(...interface{})) {
|
||||
const message = "hello there"
|
||||
writer := new(mockWriter)
|
||||
setup(writer)
|
||||
atomic.StoreUint32(&initialized, 1)
|
||||
write(message)
|
||||
println(writer.String())
|
||||
assert.True(t, strings.Contains(writer.String(), message))
|
||||
}
|
||||
|
||||
func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
||||
SetUp(LogConf{
|
||||
Mode: mode,
|
||||
@@ -456,3 +539,11 @@ func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
||||
ErrorStackf(message)
|
||||
assert.Equal(t, 0, writer.builder.Len())
|
||||
}
|
||||
|
||||
type ValStringer struct {
|
||||
val string
|
||||
}
|
||||
|
||||
func (v ValStringer) String() string {
|
||||
return v.val
|
||||
}
|
||||
|
||||
@@ -13,9 +13,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
)
|
||||
|
||||
func TestDailyRotateRuleMarkRotated(t *testing.T) {
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
@@ -77,16 +77,24 @@ func (l *traceLogger) WithDuration(duration time.Duration) Logger {
|
||||
}
|
||||
|
||||
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
|
||||
outputJson(writer, &traceLogger{
|
||||
logEntry: logEntry{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Duration: l.Duration,
|
||||
Content: val,
|
||||
},
|
||||
Trace: traceIdFromContext(l.ctx),
|
||||
Span: spanIdFromContext(l.ctx),
|
||||
})
|
||||
traceID := traceIdFromContext(l.ctx)
|
||||
spanID := spanIdFromContext(l.ctx)
|
||||
|
||||
switch encoding {
|
||||
case plainEncodingType:
|
||||
writePlainAny(writer, level, val, l.Duration, traceID, spanID)
|
||||
default:
|
||||
outputJson(writer, &traceLogger{
|
||||
logEntry: logEntry{
|
||||
Timestamp: getTimestamp(),
|
||||
Level: level,
|
||||
Duration: l.Duration,
|
||||
Content: val,
|
||||
},
|
||||
Trace: traceID,
|
||||
Span: spanID,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// WithContext sets ctx to log, for keeping tracing information.
|
||||
|
||||
@@ -82,6 +82,37 @@ func TestTraceInfo(t *testing.T) {
|
||||
assert.True(t, strings.Contains(buf.String(), spanKey))
|
||||
}
|
||||
|
||||
func TestTraceInfoConsole(t *testing.T) {
|
||||
old := encoding
|
||||
encoding = plainEncodingType
|
||||
defer func() {
|
||||
encoding = old
|
||||
}()
|
||||
|
||||
var buf mockWriter
|
||||
atomic.StoreUint32(&initialized, 1)
|
||||
infoLog = newLogWriter(log.New(&buf, "", flags))
|
||||
otp := otel.GetTracerProvider()
|
||||
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
|
||||
otel.SetTracerProvider(tp)
|
||||
defer otel.SetTracerProvider(otp)
|
||||
|
||||
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
|
||||
l := WithContext(ctx).(*traceLogger)
|
||||
SetLevel(InfoLevel)
|
||||
l.WithDuration(time.Second).Info(testlog)
|
||||
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||
buf.Reset()
|
||||
l.WithDuration(time.Second).Infof(testlog)
|
||||
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||
buf.Reset()
|
||||
l.WithDuration(time.Second).Infov(testlog)
|
||||
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||
}
|
||||
|
||||
func TestTraceSlow(t *testing.T) {
|
||||
var buf mockWriter
|
||||
atomic.StoreUint32(&initialized, 1)
|
||||
|
||||
@@ -3,7 +3,7 @@ package mapping
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/jsonx"
|
||||
"github.com/zeromicro/go-zero/core/jsonx"
|
||||
)
|
||||
|
||||
const jsonTagKey = "json"
|
||||
|
||||
@@ -9,9 +9,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/jsonx"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/jsonx"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -742,7 +742,9 @@ func getValueWithChainedKeys(m Valuer, keys []string) (interface{}, bool) {
|
||||
if len(keys) == 1 {
|
||||
v, ok := m.Value(keys[0])
|
||||
return v, ok
|
||||
} else if len(keys) > 1 {
|
||||
}
|
||||
|
||||
if len(keys) > 1 {
|
||||
if v, ok := m.Value(keys[0]); ok {
|
||||
if nextm, ok := v.(map[string]interface{}); ok {
|
||||
return getValueWithChainedKeys(MapValuer(nextm), keys[1:])
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
// because json.Number doesn't support strconv.ParseUint(...),
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
func TestMaxInt(t *testing.T) {
|
||||
|
||||
@@ -2,7 +2,7 @@ package metric
|
||||
|
||||
import (
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/tal-tech/go-zero/core/proc"
|
||||
"github.com/zeromicro/go-zero/core/proc"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -2,7 +2,7 @@ package metric
|
||||
|
||||
import (
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/tal-tech/go-zero/core/proc"
|
||||
"github.com/zeromicro/go-zero/core/proc"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -2,7 +2,7 @@ package metric
|
||||
|
||||
import (
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/tal-tech/go-zero/core/proc"
|
||||
"github.com/zeromicro/go-zero/core/proc"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -3,12 +3,11 @@ package mr
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/errorx"
|
||||
"github.com/tal-tech/go-zero/core/lang"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/errorx"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -24,12 +23,12 @@ var (
|
||||
)
|
||||
|
||||
type (
|
||||
// ForEachFunc is used to do element processing, but no output.
|
||||
ForEachFunc func(item interface{})
|
||||
// GenerateFunc is used to let callers send elements into source.
|
||||
GenerateFunc func(source chan<- interface{})
|
||||
// MapFunc is used to do element processing and write the output to writer.
|
||||
MapFunc func(item interface{}, writer Writer)
|
||||
// VoidMapFunc is used to do element processing, but no output.
|
||||
VoidMapFunc func(item interface{})
|
||||
// MapperFunc is used to do element processing and write the output to writer,
|
||||
// use cancel func to cancel the processing.
|
||||
MapperFunc func(item interface{}, writer Writer, cancel func(error))
|
||||
@@ -42,6 +41,16 @@ type (
|
||||
// Option defines the method to customize the mapreduce.
|
||||
Option func(opts *mapReduceOptions)
|
||||
|
||||
mapperContext struct {
|
||||
ctx context.Context
|
||||
mapper MapFunc
|
||||
source <-chan interface{}
|
||||
panicChan *onceChan
|
||||
collector chan<- interface{}
|
||||
doneChan <-chan lang.PlaceholderType
|
||||
workers int
|
||||
}
|
||||
|
||||
mapReduceOptions struct {
|
||||
ctx context.Context
|
||||
workers int
|
||||
@@ -69,7 +78,6 @@ func Finish(fns ...func() error) error {
|
||||
cancel(err)
|
||||
}
|
||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||
drain(pipe)
|
||||
}, WithWorkers(len(fns)))
|
||||
}
|
||||
|
||||
@@ -79,7 +87,7 @@ func FinishVoid(fns ...func()) {
|
||||
return
|
||||
}
|
||||
|
||||
MapVoid(func(source chan<- interface{}) {
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for _, fn := range fns {
|
||||
source <- fn
|
||||
}
|
||||
@@ -89,41 +97,74 @@ func FinishVoid(fns ...func()) {
|
||||
}, WithWorkers(len(fns)))
|
||||
}
|
||||
|
||||
// Map maps all elements generated from given generate func, and returns an output channel.
|
||||
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
|
||||
// ForEach maps all elements from given generate but no output.
|
||||
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
||||
options := buildOptions(opts...)
|
||||
source := buildSource(generate)
|
||||
panicChan := &onceChan{channel: make(chan interface{})}
|
||||
source := buildSource(generate, panicChan)
|
||||
collector := make(chan interface{}, options.workers)
|
||||
done := make(chan lang.PlaceholderType)
|
||||
|
||||
go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
|
||||
go executeMappers(mapperContext{
|
||||
ctx: options.ctx,
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
mapper(item)
|
||||
},
|
||||
source: source,
|
||||
panicChan: panicChan,
|
||||
collector: collector,
|
||||
doneChan: done,
|
||||
workers: options.workers,
|
||||
})
|
||||
|
||||
return collector
|
||||
for {
|
||||
select {
|
||||
case v := <-panicChan.channel:
|
||||
panic(v)
|
||||
case _, ok := <-collector:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MapReduce maps all elements generated from given generate func,
|
||||
// and reduces the output elements with given reducer.
|
||||
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
||||
opts ...Option) (interface{}, error) {
|
||||
source := buildSource(generate)
|
||||
return MapReduceWithSource(source, mapper, reducer, opts...)
|
||||
panicChan := &onceChan{channel: make(chan interface{})}
|
||||
source := buildSource(generate, panicChan)
|
||||
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||
}
|
||||
|
||||
// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
|
||||
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
||||
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
||||
opts ...Option) (interface{}, error) {
|
||||
panicChan := &onceChan{channel: make(chan interface{})}
|
||||
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||
}
|
||||
|
||||
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
|
||||
reducer ReducerFunc, opts ...Option) (interface{}, error) {
|
||||
options := buildOptions(opts...)
|
||||
// output is used to write the final result
|
||||
output := make(chan interface{})
|
||||
defer func() {
|
||||
// reducer can only write once, if more, panic
|
||||
for range output {
|
||||
panic("more than one element written in reducer")
|
||||
}
|
||||
}()
|
||||
|
||||
// collector is used to collect data from mapper, and consume in reducer
|
||||
collector := make(chan interface{}, options.workers)
|
||||
// if done is closed, all mappers and reducer should stop processing
|
||||
done := make(chan lang.PlaceholderType)
|
||||
writer := newGuardedWriter(options.ctx, output, done)
|
||||
var closeOnce sync.Once
|
||||
// use atomic.Value to avoid data race
|
||||
var retErr errorx.AtomicError
|
||||
finish := func() {
|
||||
closeOnce.Do(func() {
|
||||
@@ -145,28 +186,41 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
||||
go func() {
|
||||
defer func() {
|
||||
drain(collector)
|
||||
|
||||
if r := recover(); r != nil {
|
||||
cancel(fmt.Errorf("%v", r))
|
||||
} else {
|
||||
finish()
|
||||
panicChan.write(r)
|
||||
}
|
||||
finish()
|
||||
}()
|
||||
|
||||
reducer(collector, writer, cancel)
|
||||
}()
|
||||
|
||||
go executeMappers(options.ctx, func(item interface{}, w Writer) {
|
||||
mapper(item, w, cancel)
|
||||
}, source, collector, done, options.workers)
|
||||
go executeMappers(mapperContext{
|
||||
ctx: options.ctx,
|
||||
mapper: func(item interface{}, w Writer) {
|
||||
mapper(item, w, cancel)
|
||||
},
|
||||
source: source,
|
||||
panicChan: panicChan,
|
||||
collector: collector,
|
||||
doneChan: done,
|
||||
workers: options.workers,
|
||||
})
|
||||
|
||||
value, ok := <-output
|
||||
if err := retErr.Load(); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
return value, nil
|
||||
} else {
|
||||
return nil, ErrReduceNoOutput
|
||||
select {
|
||||
case <-options.ctx.Done():
|
||||
cancel(context.DeadlineExceeded)
|
||||
return nil, context.DeadlineExceeded
|
||||
case v := <-panicChan.channel:
|
||||
panic(v)
|
||||
case v, ok := <-output:
|
||||
if err := retErr.Load(); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
return v, nil
|
||||
} else {
|
||||
return nil, ErrReduceNoOutput
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,18 +229,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
||||
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
||||
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
reducer(input, cancel)
|
||||
// We need to write a placeholder to let MapReduce to continue on reducer done,
|
||||
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
|
||||
writer.Write(lang.Placeholder)
|
||||
}, opts...)
|
||||
return err
|
||||
}
|
||||
if errors.Is(err, ErrReduceNoOutput) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// MapVoid maps all elements from given generate but no output.
|
||||
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
|
||||
drain(Map(generate, func(item interface{}, writer Writer) {
|
||||
mapper(item)
|
||||
}, opts...))
|
||||
return err
|
||||
}
|
||||
|
||||
// WithContext customizes a mapreduce processing accepts a given ctx.
|
||||
@@ -216,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
|
||||
return options
|
||||
}
|
||||
|
||||
func buildSource(generate GenerateFunc) chan interface{} {
|
||||
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
|
||||
source := make(chan interface{})
|
||||
threading.GoSafe(func() {
|
||||
defer close(source)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
panicChan.write(r)
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
|
||||
generate(source)
|
||||
})
|
||||
}()
|
||||
|
||||
return source
|
||||
}
|
||||
@@ -233,39 +287,43 @@ func drain(channel <-chan interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
|
||||
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
|
||||
func executeMappers(mCtx mapperContext) {
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
wg.Wait()
|
||||
close(collector)
|
||||
close(mCtx.collector)
|
||||
drain(mCtx.source)
|
||||
}()
|
||||
|
||||
pool := make(chan lang.PlaceholderType, workers)
|
||||
writer := newGuardedWriter(ctx, collector, done)
|
||||
for {
|
||||
var failed int32
|
||||
pool := make(chan lang.PlaceholderType, mCtx.workers)
|
||||
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
|
||||
for atomic.LoadInt32(&failed) == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-mCtx.ctx.Done():
|
||||
return
|
||||
case <-done:
|
||||
case <-mCtx.doneChan:
|
||||
return
|
||||
case pool <- lang.Placeholder:
|
||||
item, ok := <-input
|
||||
item, ok := <-mCtx.source
|
||||
if !ok {
|
||||
<-pool
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
// better to safely run caller defined method
|
||||
threading.GoSafe(func() {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
atomic.AddInt32(&failed, 1)
|
||||
mCtx.panicChan.write(r)
|
||||
}
|
||||
wg.Done()
|
||||
<-pool
|
||||
}()
|
||||
|
||||
mapper(item, writer)
|
||||
})
|
||||
mCtx.mapper(item, writer)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -311,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
|
||||
gw.channel <- v
|
||||
}
|
||||
}
|
||||
|
||||
type onceChan struct {
|
||||
channel chan interface{}
|
||||
wrote int32
|
||||
}
|
||||
|
||||
func (oc *onceChan) write(val interface{}) {
|
||||
if atomic.AddInt32(&oc.wrote, 1) > 1 {
|
||||
return
|
||||
}
|
||||
|
||||
oc.channel <- val
|
||||
}
|
||||
|
||||
78
core/mr/mapreduce_fuzz_test.go
Normal file
78
core/mr/mapreduce_fuzz_test.go
Normal file
@@ -0,0 +1,78 @@
|
||||
//go:build go1.18
|
||||
// +build go1.18
|
||||
|
||||
package mr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func FuzzMapReduce(f *testing.F) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
f.Add(uint(10), uint(runtime.NumCPU()))
|
||||
f.Fuzz(func(t *testing.T, num uint, workers uint) {
|
||||
n := int64(num)%5000 + 5000
|
||||
genPanic := rand.Intn(100) == 0
|
||||
mapperPanic := rand.Intn(100) == 0
|
||||
reducerPanic := rand.Intn(100) == 0
|
||||
genIdx := rand.Int63n(n)
|
||||
mapperIdx := rand.Int63n(n)
|
||||
reducerIdx := rand.Int63n(n)
|
||||
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||
|
||||
fn := func() (interface{}, error) {
|
||||
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||
|
||||
return MapReduce(func(source chan<- interface{}) {
|
||||
for i := int64(0); i < n; i++ {
|
||||
source <- i
|
||||
if genPanic && i == genIdx {
|
||||
panic("foo")
|
||||
}
|
||||
}
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int64)
|
||||
if mapperPanic && v == mapperIdx {
|
||||
panic("bar")
|
||||
}
|
||||
writer.Write(v * v)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var idx int64
|
||||
var total int64
|
||||
for v := range pipe {
|
||||
if reducerPanic && idx == reducerIdx {
|
||||
panic("baz")
|
||||
}
|
||||
total += v.(int64)
|
||||
idx++
|
||||
}
|
||||
writer.Write(total)
|
||||
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||
}
|
||||
|
||||
if genPanic || mapperPanic || reducerPanic {
|
||||
var buf strings.Builder
|
||||
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||
} else {
|
||||
val, err := fn()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, squareSum, val.(int64))
|
||||
}
|
||||
})
|
||||
}
|
||||
107
core/mr/mapreduce_rand_test.go
Normal file
107
core/mr/mapreduce_rand_test.go
Normal file
@@ -0,0 +1,107 @@
|
||||
//go:build fuzz
|
||||
// +build fuzz
|
||||
|
||||
package mr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
"gopkg.in/cheggaaa/pb.v1"
|
||||
)
|
||||
|
||||
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
|
||||
// so we need to simulate the fuzz test in test mode.
|
||||
func TestMapReduceRandom(t *testing.T) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
const (
|
||||
times = 10000
|
||||
nRange = 500
|
||||
mega = 1024 * 1024
|
||||
)
|
||||
|
||||
bar := pb.New(times).Start()
|
||||
runner := threading.NewTaskRunner(runtime.NumCPU())
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(times)
|
||||
for i := 0; i < times; i++ {
|
||||
runner.Schedule(func() {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
if time.Since(start) > time.Minute {
|
||||
t.Fatal("timeout")
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||
n := rand.Int63n(nRange)%nRange + nRange
|
||||
workers := rand.Int()%50 + runtime.NumCPU()/2
|
||||
genPanic := rand.Intn(100) == 0
|
||||
mapperPanic := rand.Intn(100) == 0
|
||||
reducerPanic := rand.Intn(100) == 0
|
||||
genIdx := rand.Int63n(n)
|
||||
mapperIdx := rand.Int63n(n)
|
||||
reducerIdx := rand.Int63n(n)
|
||||
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||
|
||||
fn := func() (interface{}, error) {
|
||||
return MapReduce(func(source chan<- interface{}) {
|
||||
for i := int64(0); i < n; i++ {
|
||||
source <- i
|
||||
if genPanic && i == genIdx {
|
||||
panic("foo")
|
||||
}
|
||||
}
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int64)
|
||||
if mapperPanic && v == mapperIdx {
|
||||
panic("bar")
|
||||
}
|
||||
writer.Write(v * v)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var idx int64
|
||||
var total int64
|
||||
for v := range pipe {
|
||||
if reducerPanic && idx == reducerIdx {
|
||||
panic("baz")
|
||||
}
|
||||
total += v.(int64)
|
||||
idx++
|
||||
}
|
||||
writer.Write(total)
|
||||
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||
}
|
||||
|
||||
if genPanic || mapperPanic || reducerPanic {
|
||||
var buf strings.Builder
|
||||
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||
} else {
|
||||
val, err := fn()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, squareSum, val.(int64))
|
||||
}
|
||||
bar.Increment()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
bar.Finish()
|
||||
}
|
||||
@@ -11,8 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/stringx"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
var errDummy = errors.New("dummy")
|
||||
@@ -22,6 +21,8 @@ func init() {
|
||||
}
|
||||
|
||||
func TestFinish(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var total uint32
|
||||
err := Finish(func() error {
|
||||
atomic.AddUint32(&total, 2)
|
||||
@@ -39,14 +40,20 @@ func TestFinish(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFinishNone(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Nil(t, Finish())
|
||||
}
|
||||
|
||||
func TestFinishVoidNone(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
FinishVoid()
|
||||
}
|
||||
|
||||
func TestFinishErr(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var total uint32
|
||||
err := Finish(func() error {
|
||||
atomic.AddUint32(&total, 2)
|
||||
@@ -63,6 +70,8 @@ func TestFinishErr(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFinishVoid(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var total uint32
|
||||
FinishVoid(func() {
|
||||
atomic.AddUint32(&total, 2)
|
||||
@@ -75,70 +84,107 @@ func TestFinishVoid(t *testing.T) {
|
||||
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
|
||||
}
|
||||
|
||||
func TestMap(t *testing.T) {
|
||||
tests := []struct {
|
||||
mapper MapFunc
|
||||
expect int
|
||||
}{
|
||||
{
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
},
|
||||
expect: 30,
|
||||
},
|
||||
{
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
v := item.(int)
|
||||
if v%2 == 0 {
|
||||
return
|
||||
}
|
||||
writer.Write(v * v)
|
||||
},
|
||||
expect: 10,
|
||||
},
|
||||
{
|
||||
mapper: func(item interface{}, writer Writer) {
|
||||
v := item.(int)
|
||||
if v%2 == 0 {
|
||||
panic(v)
|
||||
}
|
||||
writer.Write(v * v)
|
||||
},
|
||||
expect: 10,
|
||||
},
|
||||
}
|
||||
func TestForEach(t *testing.T) {
|
||||
const tasks = 1000
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
||||
channel := Map(func(source chan<- interface{}) {
|
||||
for i := 1; i < 5; i++ {
|
||||
t.Run("all", func(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var count uint32
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}) {
|
||||
atomic.AddUint32(&count, 1)
|
||||
}, WithWorkers(-1))
|
||||
|
||||
assert.Equal(t, tasks, int(count))
|
||||
})
|
||||
|
||||
t.Run("odd", func(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var count uint32
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}) {
|
||||
if item.(int)%2 == 0 {
|
||||
atomic.AddUint32(&count, 1)
|
||||
}
|
||||
})
|
||||
|
||||
assert.Equal(t, tasks/2, int(count))
|
||||
})
|
||||
|
||||
t.Run("all", func(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, test.mapper, WithWorkers(-1))
|
||||
|
||||
var result int
|
||||
for v := range channel {
|
||||
result += v.(int)
|
||||
}
|
||||
|
||||
assert.Equal(t, test.expect, result)
|
||||
}, func(item interface{}) {
|
||||
panic("foo")
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGeneratePanic(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
t.Run("all", func(t *testing.T) {
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
panic("foo")
|
||||
}, func(item interface{}) {
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapperPanic(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
const tasks = 1000
|
||||
var run int32
|
||||
t.Run("all", func(t *testing.T) {
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
atomic.AddInt32(&run, 1)
|
||||
panic("foo")
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
})
|
||||
})
|
||||
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapReduce(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mapper MapperFunc
|
||||
reducer ReducerFunc
|
||||
expectErr error
|
||||
expectValue interface{}
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
expectErr: nil,
|
||||
expectValue: 30,
|
||||
},
|
||||
{
|
||||
name: "cancel with error",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -149,6 +195,7 @@ func TestMapReduce(t *testing.T) {
|
||||
expectErr: errDummy,
|
||||
},
|
||||
{
|
||||
name: "cancel with nil",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -160,6 +207,7 @@ func TestMapReduce(t *testing.T) {
|
||||
expectValue: nil,
|
||||
},
|
||||
{
|
||||
name: "cancel with more",
|
||||
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
@@ -174,36 +222,74 @@ func TestMapReduce(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
}
|
||||
}
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
t.Run("MapReduce", func(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
value, err := MapReduce(func(source chan<- interface{}) {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
||||
value, err := MapReduce(func(source chan<- interface{}) {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
||||
|
||||
assert.Equal(t, test.expectErr, err)
|
||||
assert.Equal(t, test.expectValue, value)
|
||||
})
|
||||
}
|
||||
assert.Equal(t, test.expectErr, err)
|
||||
assert.Equal(t, test.expectValue, value)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("MapReduce", func(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
writer.Write(v * v)
|
||||
}
|
||||
}
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
|
||||
source := make(chan interface{})
|
||||
go func() {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
|
||||
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
|
||||
assert.Equal(t, test.expectErr, err)
|
||||
assert.Equal(t, test.expectValue, value)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
MapReduce(func(source chan<- interface{}) {
|
||||
for i := 0; i < 10; i++ {
|
||||
@@ -220,18 +306,23 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMapReduceVoid(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var value uint32
|
||||
tests := []struct {
|
||||
name string
|
||||
mapper MapperFunc
|
||||
reducer VoidReducerFunc
|
||||
expectValue uint32
|
||||
expectErr error
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
expectValue: 30,
|
||||
expectErr: nil,
|
||||
},
|
||||
{
|
||||
name: "cancel with error",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -242,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
expectErr: errDummy,
|
||||
},
|
||||
{
|
||||
name: "cancel with nil",
|
||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
if v%3 == 0 {
|
||||
@@ -252,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
expectErr: ErrCancelWithNil,
|
||||
},
|
||||
{
|
||||
name: "cancel with more",
|
||||
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
result := atomic.AddUint32(&value, uint32(item.(int)))
|
||||
@@ -265,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
atomic.StoreUint32(&value, 0)
|
||||
|
||||
if test.mapper == nil {
|
||||
@@ -296,6 +389,8 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMapReduceVoidWithDelay(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var result []int
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
@@ -318,38 +413,64 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
|
||||
assert.Equal(t, 0, result[1])
|
||||
}
|
||||
|
||||
func TestMapVoid(t *testing.T) {
|
||||
const tasks = 1000
|
||||
var count uint32
|
||||
MapVoid(func(source chan<- interface{}) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}) {
|
||||
atomic.AddUint32(&count, 1)
|
||||
})
|
||||
func TestMapReducePanic(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Equal(t, tasks, int(count))
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("panic")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapReducePanic(t *testing.T) {
|
||||
v, err := MapReduce(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("panic")
|
||||
}
|
||||
func TestMapReducePanicOnce(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
for i := 0; i < 100; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
if i == 0 {
|
||||
panic("foo")
|
||||
}
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("bar")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
panic("foo")
|
||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||
panic("bar")
|
||||
})
|
||||
})
|
||||
assert.Nil(t, v)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "panic", err.Error())
|
||||
}
|
||||
|
||||
func TestMapReduceVoidCancel(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var result []int
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
source <- 0
|
||||
@@ -371,13 +492,15 @@ func TestMapReduceVoidCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||
var done syncx.AtomicBool
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var done int32
|
||||
var result []int
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
done.Set(true)
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
if i == defaultWorkers/2 {
|
||||
@@ -392,10 +515,12 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||
})
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "anything", err.Error())
|
||||
assert.True(t, done.True())
|
||||
assert.Equal(t, int32(1), done)
|
||||
}
|
||||
|
||||
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
uids := []int{1, 2, 3}
|
||||
res, err := MapReduce(func(source chan<- interface{}) {
|
||||
for _, uid := range uids {
|
||||
@@ -412,33 +537,54 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
const message = "foo"
|
||||
var done syncx.AtomicBool
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
assert.Panics(t, func() {
|
||||
var done int32
|
||||
_ = MapReduceVoid(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||
panic(message)
|
||||
}, WithWorkers(1))
|
||||
})
|
||||
}
|
||||
|
||||
func TestForEachWithContext(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var done int32
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ForEach(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
done.Set(true)
|
||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}) {
|
||||
i := item.(int)
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||
panic(message)
|
||||
}, WithWorkers(1))
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, message, err.Error())
|
||||
assert.True(t, done.True())
|
||||
if i == defaultWorkers/2 {
|
||||
cancel()
|
||||
}
|
||||
}, WithContext(ctx))
|
||||
}
|
||||
|
||||
func TestMapReduceWithContext(t *testing.T) {
|
||||
var done syncx.AtomicBool
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var done int32
|
||||
var result []int
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
done.Set(true)
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item interface{}, writer Writer, c func(error)) {
|
||||
i := item.(int)
|
||||
if i == defaultWorkers/2 {
|
||||
@@ -452,7 +598,7 @@ func TestMapReduceWithContext(t *testing.T) {
|
||||
}
|
||||
}, WithContext(ctx))
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, ErrReduceNoOutput, err)
|
||||
assert.Equal(t, context.DeadlineExceeded, err)
|
||||
}
|
||||
|
||||
func BenchmarkMapReduce(b *testing.B) {
|
||||
|
||||
@@ -54,7 +54,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mr"
|
||||
"github.com/zeromicro/go-zero/core/mr"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
@@ -55,7 +55,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mr"
|
||||
"github.com/zeromicro/go-zero/core/mr"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -87,4 +87,4 @@ More examples: [https://github.com/zeromicro/zero-examples/tree/main/mapreduce](
|
||||
|
||||
## Give a Star! ⭐
|
||||
|
||||
If you like or are using this project to learn or start your solution, please give it a star. Thanks!
|
||||
If you like or are using this project to learn or start your solution, please give it a star. Thanks!
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
// DefaultMemProfileRate is the default memory profiling rate.
|
||||
|
||||
@@ -10,8 +10,8 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
const timeFormat = "0102150405"
|
||||
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/olekukonko/tablewriter"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package prof
|
||||
|
||||
import "github.com/tal-tech/go-zero/core/utils"
|
||||
import "github.com/zeromicro/go-zero/core/utils"
|
||||
|
||||
type (
|
||||
// A ProfilePoint is a profile time point.
|
||||
|
||||
@@ -3,7 +3,7 @@ package prof
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/utils"
|
||||
"github.com/zeromicro/go-zero/core/utils"
|
||||
)
|
||||
|
||||
func TestProfiler(t *testing.T) {
|
||||
|
||||
30
core/prof/runtime.go
Normal file
30
core/prof/runtime.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package prof
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultInterval = time.Second * 5
|
||||
mega = 1024 * 1024
|
||||
)
|
||||
|
||||
func DisplayStats(interval ...time.Duration) {
|
||||
duration := defaultInterval
|
||||
for _, val := range interval {
|
||||
duration = val
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(duration)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
fmt.Printf("Goroutines: %d, Alloc: %vm, TotalAlloc: %vm, Sys: %vm, NumGC: %v\n",
|
||||
runtime.NumGoroutine(), m.Alloc/mega, m.TotalAlloc/mega, m.Sys/mega, m.NumGC)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/syncx"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
// ErrNoAvailablePusher indicates no pusher available.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package queue
|
||||
|
||||
import "github.com/tal-tech/go-zero/core/errorx"
|
||||
import "github.com/zeromicro/go-zero/core/errorx"
|
||||
|
||||
// A MultiPusher is a pusher that can push messages to multiple underlying pushers.
|
||||
type MultiPusher struct {
|
||||
|
||||
@@ -6,11 +6,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/rescue"
|
||||
"github.com/tal-tech/go-zero/core/stat"
|
||||
"github.com/tal-tech/go-zero/core/threading"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/rescue"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/threading"
|
||||
"github.com/zeromicro/go-zero/core/timex"
|
||||
)
|
||||
|
||||
const queueName = "queue"
|
||||
|
||||
@@ -6,8 +6,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tal-tech/go-zero/core/logx"
|
||||
"github.com/tal-tech/go-zero/core/mathx"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/mathx"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user