Compare commits
41 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7727d70634 | ||
|
|
5f9d101bc6 | ||
|
|
6c2abe7474 | ||
|
|
14a902c1a7 | ||
|
|
5ad6a6d229 | ||
|
|
6f4b97864a | ||
|
|
0e0abc3a95 | ||
|
|
696fda1db4 | ||
|
|
c1d2634427 | ||
|
|
4b7a680ac5 | ||
|
|
b3e7d2901f | ||
|
|
cdf7ec213c | ||
|
|
f1102fb262 | ||
|
|
09d1fad6e0 | ||
|
|
379c65a3ef | ||
|
|
fdc7f64d6f | ||
|
|
df0f8ed59e | ||
|
|
c903966fc7 | ||
|
|
e57fa8ff53 | ||
|
|
bf2feee5b7 | ||
|
|
ce05c429fc | ||
|
|
272a3f347d | ||
|
|
13db7a1931 | ||
|
|
468c237189 | ||
|
|
b9b80c068b | ||
|
|
9b592b3dee | ||
|
|
2203809e5e | ||
|
|
8d6d37f71e | ||
|
|
ea4f2af67f | ||
|
|
53af194ef9 | ||
|
|
5e0e2d2b14 | ||
|
|
74c99184c5 | ||
|
|
eb4b86137a | ||
|
|
9c4f4f3b4e | ||
|
|
240132e7c7 | ||
|
|
9d67fc4cfb | ||
|
|
892f93a716 | ||
|
|
ba6a7c9dc8 | ||
|
|
a91c3907a8 | ||
|
|
e267d94ee1 | ||
|
|
89ce5e492b |
2
.github/workflows/go.yml
vendored
2
.github/workflows/go.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
|||||||
- name: Set up Go 1.x
|
- name: Set up Go 1.x
|
||||||
uses: actions/setup-go@v2
|
uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: ^1.14
|
go-version: ^1.15
|
||||||
id: go
|
id: go
|
||||||
|
|
||||||
- name: Check out code into the Go module directory
|
- name: Check out code into the Go module directory
|
||||||
|
|||||||
18
.github/workflows/issue-translator.yml
vendored
Normal file
18
.github/workflows/issue-translator.yml
vendored
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
name: 'issue-translator'
|
||||||
|
on:
|
||||||
|
issue_comment:
|
||||||
|
types: [created]
|
||||||
|
issues:
|
||||||
|
types: [opened]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: tomsun28/issues-translate-action@v2.6
|
||||||
|
with:
|
||||||
|
IS_MODIFY_TITLE: true
|
||||||
|
# not require, default false, . Decide whether to modify the issue title
|
||||||
|
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
|
||||||
|
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑🤝🧑👫🧑🏿🤝🧑🏻👩🏾🤝👨🏿👬🏿
|
||||||
|
# not require. Customize the translation robot prefix message.
|
||||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -16,7 +16,8 @@
|
|||||||
**/logs
|
**/logs
|
||||||
|
|
||||||
# for test purpose
|
# for test purpose
|
||||||
adhoc
|
**/adhoc
|
||||||
|
**/testdata
|
||||||
|
|
||||||
# gitlab ci
|
# gitlab ci
|
||||||
.cache
|
.cache
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ We will help you to contribute in different areas like filing issues, developing
|
|||||||
getting your work reviewed and merged.
|
getting your work reviewed and merged.
|
||||||
|
|
||||||
If you have questions about the development process,
|
If you have questions about the development process,
|
||||||
feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose).
|
feel free to [file an issue](https://github.com/zeromicro/go-zero/issues/new/choose).
|
||||||
|
|
||||||
## Find something to work on
|
## Find something to work on
|
||||||
|
|
||||||
@@ -50,10 +50,10 @@ Here is how you get started.
|
|||||||
|
|
||||||
### Find a good first topic
|
### Find a good first topic
|
||||||
|
|
||||||
[go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue.
|
[go-zero](https://github.com/zeromicro/go-zero) has beginner-friendly issues that provide a good first issue.
|
||||||
For example, [go-zero](https://github.com/tal-tech/go-zero) has
|
For example, [go-zero](https://github.com/zeromicro/go-zero) has
|
||||||
[help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
|
[help wanted](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
|
||||||
[good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
|
[good first issue](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
|
||||||
labels for issues that should not need deep knowledge of the system.
|
labels for issues that should not need deep knowledge of the system.
|
||||||
We can help new contributors who wish to work on such issues.
|
We can help new contributors who wish to work on such issues.
|
||||||
|
|
||||||
@@ -79,7 +79,7 @@ This is a rough outline of what a contributor's workflow looks like:
|
|||||||
- Create a topic branch from where to base the contribution. This is usually master.
|
- Create a topic branch from where to base the contribution. This is usually master.
|
||||||
- Make commits of logical units.
|
- Make commits of logical units.
|
||||||
- Push changes in a topic branch to a personal fork of the repository.
|
- Push changes in a topic branch to a personal fork of the repository.
|
||||||
- Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero).
|
- Submit a pull request to [go-zero](https://github.com/zeromicro/go-zero).
|
||||||
|
|
||||||
## Creating Pull Requests
|
## Creating Pull Requests
|
||||||
|
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/hash"
|
"github.com/zeromicro/go-zero/core/hash"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRedisBitSet_New_Set_Test(t *testing.T) {
|
func TestRedisBitSet_New_Set_Test(t *testing.T) {
|
||||||
|
|||||||
@@ -6,11 +6,11 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -7,9 +7,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,8 +8,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/iox"
|
"github.com/zeromicro/go-zero/core/iox"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestEnterToContinue(t *testing.T) {
|
func TestEnterToContinue(t *testing.T) {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrPaddingSize indicates bad padding size.
|
// ErrPaddingSize indicates bad padding size.
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const duration = time.Millisecond * 50
|
const duration = time.Millisecond * 50
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSafeMap(t *testing.T) {
|
func TestSafeMap(t *testing.T) {
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
package collection
|
package collection
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -5,9 +5,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const drainWorkers = 8
|
const drainWorkers = 8
|
||||||
|
|||||||
@@ -8,10 +8,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/mapping"
|
"github.com/zeromicro/go-zero/core/mapping"
|
||||||
)
|
)
|
||||||
|
|
||||||
var loaders = map[string]func([]byte, interface{}) error{
|
var loaders = map[string]func([]byte, interface{}) error{
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
"github.com/tal-tech/go-zero/core/hash"
|
"github.com/zeromicro/go-zero/core/hash"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLoadConfig_notExists(t *testing.T) {
|
func TestLoadConfig_notExists(t *testing.T) {
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/iox"
|
"github.com/zeromicro/go-zero/core/iox"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PropertyError represents a configuration error message.
|
// PropertyError represents a configuration error message.
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProperties(t *testing.T) {
|
func TestProperties(t *testing.T) {
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package contextx
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/mapping"
|
"github.com/zeromicro/go-zero/core/mapping"
|
||||||
)
|
)
|
||||||
|
|
||||||
const contextTagKey = "ctx"
|
const contextTagKey = "ctx"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package discov
|
package discov
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/discov/internal"
|
import "github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
|
|
||||||
// RegisterAccount registers the username/password to the given etcd cluster.
|
// RegisterAccount registers the username/password to the given etcd cluster.
|
||||||
func RegisterAccount(endpoints []string, user, pass string) {
|
func RegisterAccount(endpoints []string, user, pass string) {
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRegisterAccount(t *testing.T) {
|
func TestRegisterAccount(t *testing.T) {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
var mockLock sync.Mutex
|
var mockLock sync.Mutex
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAccount(t *testing.T) {
|
func TestAccount(t *testing.T) {
|
||||||
|
|||||||
@@ -9,11 +9,11 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/contextx"
|
"github.com/zeromicro/go-zero/core/contextx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -7,10 +7,10 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/contextx"
|
"github.com/zeromicro/go-zero/core/contextx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
package discov
|
package discov
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -8,10 +8,10 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -4,9 +4,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/discov/internal"
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A DelayExecutor delays a tasks on given delay interval.
|
// A DelayExecutor delays a tasks on given delay interval.
|
||||||
|
|||||||
@@ -3,8 +3,8 @@ package executors
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A LessExecutor is an executor to limit execution once within given time interval.
|
// A LessExecutor is an executor to limit execution once within given time interval.
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLessExecutor_DoOrDiscard(t *testing.T) {
|
func TestLessExecutor_DoOrDiscard(t *testing.T) {
|
||||||
|
|||||||
@@ -6,11 +6,11 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const idleRound = 10
|
const idleRound = 10
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const threshold = 10
|
const threshold = 10
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSplitLineChunks(t *testing.T) {
|
func TestSplitLineChunks(t *testing.T) {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRangeReader(t *testing.T) {
|
func TestRangeReader(t *testing.T) {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/hash"
|
"github.com/zeromicro/go-zero/core/hash"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TempFileWithText creates the temporary file with the given content,
|
// TempFileWithText creates the temporary file with the given content,
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package fx
|
package fx
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/threading"
|
import "github.com/zeromicro/go-zero/core/threading"
|
||||||
|
|
||||||
// Parallel runs fns parallelly and waits for done.
|
// Parallel runs fns parallelly and waits for done.
|
||||||
func Parallel(fns ...func()) {
|
func Parallel(fns ...func()) {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package fx
|
package fx
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/errorx"
|
import "github.com/zeromicro/go-zero/core/errorx"
|
||||||
|
|
||||||
const defaultRetryTimes = 3
|
const defaultRetryTimes = 3
|
||||||
|
|
||||||
|
|||||||
@@ -4,9 +4,9 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -13,7 +13,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBuffer(t *testing.T) {
|
func TestBuffer(t *testing.T) {
|
||||||
@@ -563,9 +564,6 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
|
func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
|
||||||
goroutines := runtime.NumGoroutine()
|
defer goleak.VerifyNone(t)
|
||||||
fn(t)
|
fn(t)
|
||||||
// let scheduler schedule first
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
assert.True(t, runtime.NumGoroutine() <= goroutines)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/mapping"
|
"github.com/zeromicro/go-zero/core/mapping"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -9,8 +9,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReadText(t *testing.T) {
|
func TestReadText(t *testing.T) {
|
||||||
|
|||||||
@@ -5,12 +5,11 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
|
||||||
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
|
const periodScript = `local limit = tonumber(ARGV[1])
|
||||||
periodScript = `local limit = tonumber(ARGV[1])
|
|
||||||
local window = tonumber(ARGV[2])
|
local window = tonumber(ARGV[2])
|
||||||
local current = redis.call("INCRBY", KEYS[1], 1)
|
local current = redis.call("INCRBY", KEYS[1], 1)
|
||||||
if current == 1 then
|
if current == 1 then
|
||||||
@@ -23,8 +22,6 @@ elseif current == limit then
|
|||||||
else
|
else
|
||||||
return 0
|
return 0
|
||||||
end`
|
end`
|
||||||
zoneDiff = 3600 * 8 // GMT+8 for our services
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Unknown means not initialized state.
|
// Unknown means not initialized state.
|
||||||
@@ -104,7 +101,9 @@ func (h *PeriodLimit) Take(key string) (int, error) {
|
|||||||
|
|
||||||
func (h *PeriodLimit) calcExpireSeconds() int {
|
func (h *PeriodLimit) calcExpireSeconds() int {
|
||||||
if h.align {
|
if h.align {
|
||||||
unix := time.Now().Unix() + zoneDiff
|
now := time.Now()
|
||||||
|
_, offset := now.Zone()
|
||||||
|
unix := now.Unix() + int64(offset)
|
||||||
return h.period - int(unix%int64(h.period))
|
return h.period - int(unix%int64(h.period))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,6 +111,8 @@ func (h *PeriodLimit) calcExpireSeconds() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Align returns a func to customize a PeriodLimit with alignment.
|
// Align returns a func to customize a PeriodLimit with alignment.
|
||||||
|
// For example, if we want to limit end users with 5 sms verification messages every day,
|
||||||
|
// we need to align with the local timezone and the start of the day.
|
||||||
func Align() PeriodOption {
|
func Align() PeriodOption {
|
||||||
return func(l *PeriodLimit) {
|
return func(l *PeriodLimit) {
|
||||||
l.align = true
|
l.align = true
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ import (
|
|||||||
|
|
||||||
"github.com/alicebob/miniredis/v2"
|
"github.com/alicebob/miniredis/v2"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPeriodLimit_Take(t *testing.T) {
|
func TestPeriodLimit_Take(t *testing.T) {
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
xrate "golang.org/x/time/rate"
|
xrate "golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -85,8 +85,8 @@ func (lim *TokenLimiter) Allow() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AllowN reports whether n events may happen at time now.
|
// AllowN reports whether n events may happen at time now.
|
||||||
// Use this method if you intend to drop / skip events that exceed the rate rate.
|
// Use this method if you intend to drop / skip events that exceed the rate.
|
||||||
// Otherwise use Reserve or Wait.
|
// Otherwise, use Reserve or Wait.
|
||||||
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
|
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
|
||||||
return lim.reserveN(now, n)
|
return lim.reserveN(now, n)
|
||||||
}
|
}
|
||||||
@@ -112,7 +112,8 @@ func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
|
|||||||
// Lua boolean false -> r Nil bulk reply
|
// Lua boolean false -> r Nil bulk reply
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
return false
|
return false
|
||||||
} else if err != nil {
|
}
|
||||||
|
if err != nil {
|
||||||
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
|
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
|
||||||
lim.startMonitor()
|
lim.startMonitor()
|
||||||
return lim.rescueLimiter.AllowN(now, n)
|
return lim.rescueLimiter.AllowN(now, n)
|
||||||
|
|||||||
@@ -6,9 +6,9 @@ import (
|
|||||||
|
|
||||||
"github.com/alicebob/miniredis/v2"
|
"github.com/alicebob/miniredis/v2"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
|
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -7,11 +7,11 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,11 +8,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/collection"
|
"github.com/zeromicro/go-zero/core/collection"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package load
|
|||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A ShedderGroup is a manager to manage key based shedders.
|
// A ShedderGroup is a manager to manage key based shedders.
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -3,10 +3,11 @@ package logx
|
|||||||
// A LogConf is a logging config.
|
// A LogConf is a logging config.
|
||||||
type LogConf struct {
|
type LogConf struct {
|
||||||
ServiceName string `json:",optional"`
|
ServiceName string `json:",optional"`
|
||||||
Mode string `json:",default=console,options=console|file|volume"`
|
Mode string `json:",default=console,options=[console,file,volume]"`
|
||||||
|
Encoding string `json:",default=json,options=[json,plain]"`
|
||||||
TimeFormat string `json:",optional"`
|
TimeFormat string `json:",optional"`
|
||||||
Path string `json:",default=logs"`
|
Path string `json:",default=logs"`
|
||||||
Level string `json:",default=info,options=info|error|severe"`
|
Level string `json:",default=info,options=[info,error,severe]"`
|
||||||
Compress bool `json:",optional"`
|
Compress bool `json:",optional"`
|
||||||
KeepDays int `json:",optional"`
|
KeepDays int `json:",optional"`
|
||||||
StackCooldownMillis int `json:",default=100"`
|
StackCooldownMillis int `json:",default=100"`
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const durationCallerDepth = 3
|
const durationCallerDepth = 3
|
||||||
@@ -79,10 +79,15 @@ func (l *durationLogger) WithDuration(duration time.Duration) Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
|
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
|
||||||
outputJson(writer, &durationLogger{
|
switch encoding {
|
||||||
Timestamp: getTimestamp(),
|
case plainEncodingType:
|
||||||
Level: level,
|
writePlainAny(writer, level, val, l.Duration)
|
||||||
Content: val,
|
default:
|
||||||
Duration: l.Duration,
|
outputJson(writer, &durationLogger{
|
||||||
})
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Content: val,
|
||||||
|
Duration: l.Duration,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,19 @@ func TestWithDurationInfo(t *testing.T) {
|
|||||||
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
|
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWithDurationInfoConsole(t *testing.T) {
|
||||||
|
old := encoding
|
||||||
|
encoding = plainEncodingType
|
||||||
|
defer func() {
|
||||||
|
encoding = old
|
||||||
|
}()
|
||||||
|
|
||||||
|
var builder strings.Builder
|
||||||
|
log.SetOutput(&builder)
|
||||||
|
WithDuration(time.Second).Info("foo")
|
||||||
|
assert.True(t, strings.Contains(builder.String(), "ms"), builder.String())
|
||||||
|
}
|
||||||
|
|
||||||
func TestWithDurationInfof(t *testing.T) {
|
func TestWithDurationInfof(t *testing.T) {
|
||||||
var builder strings.Builder
|
var builder strings.Builder
|
||||||
log.SetOutput(&builder)
|
log.SetOutput(&builder)
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
type limitedExecutor struct {
|
type limitedExecutor struct {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLimitedExecutor_logOrDiscard(t *testing.T) {
|
func TestLimitedExecutor_logOrDiscard(t *testing.T) {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package logx
|
package logx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -17,9 +18,9 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/iox"
|
"github.com/zeromicro/go-zero/core/iox"
|
||||||
"github.com/tal-tech/go-zero/core/sysx"
|
"github.com/zeromicro/go-zero/core/sysx"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -31,6 +32,15 @@ const (
|
|||||||
SevereLevel
|
SevereLevel
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
jsonEncodingType = iota
|
||||||
|
plainEncodingType
|
||||||
|
|
||||||
|
jsonEncoding = "json"
|
||||||
|
plainEncoding = "plain"
|
||||||
|
plainEncodingSep = '\t'
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
accessFilename = "access.log"
|
accessFilename = "access.log"
|
||||||
errorFilename = "error.log"
|
errorFilename = "error.log"
|
||||||
@@ -65,6 +75,7 @@ var (
|
|||||||
timeFormat = "2006-01-02T15:04:05.000Z07"
|
timeFormat = "2006-01-02T15:04:05.000Z07"
|
||||||
writeConsole bool
|
writeConsole bool
|
||||||
logLevel uint32
|
logLevel uint32
|
||||||
|
encoding = jsonEncodingType
|
||||||
// use uint32 for atomic operations
|
// use uint32 for atomic operations
|
||||||
disableStat uint32
|
disableStat uint32
|
||||||
infoLog io.WriteCloser
|
infoLog io.WriteCloser
|
||||||
@@ -124,6 +135,12 @@ func SetUp(c LogConf) error {
|
|||||||
if len(c.TimeFormat) > 0 {
|
if len(c.TimeFormat) > 0 {
|
||||||
timeFormat = c.TimeFormat
|
timeFormat = c.TimeFormat
|
||||||
}
|
}
|
||||||
|
switch c.Encoding {
|
||||||
|
case plainEncoding:
|
||||||
|
encoding = plainEncodingType
|
||||||
|
default:
|
||||||
|
encoding = jsonEncodingType
|
||||||
|
}
|
||||||
|
|
||||||
switch c.Mode {
|
switch c.Mode {
|
||||||
case consoleMode:
|
case consoleMode:
|
||||||
@@ -407,21 +424,31 @@ func infoTextSync(msg string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func outputAny(writer io.Writer, level string, val interface{}) {
|
func outputAny(writer io.Writer, level string, val interface{}) {
|
||||||
info := logEntry{
|
switch encoding {
|
||||||
Timestamp: getTimestamp(),
|
case plainEncodingType:
|
||||||
Level: level,
|
writePlainAny(writer, level, val)
|
||||||
Content: val,
|
default:
|
||||||
|
info := logEntry{
|
||||||
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Content: val,
|
||||||
|
}
|
||||||
|
outputJson(writer, info)
|
||||||
}
|
}
|
||||||
outputJson(writer, info)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func outputText(writer io.Writer, level, msg string) {
|
func outputText(writer io.Writer, level, msg string) {
|
||||||
info := logEntry{
|
switch encoding {
|
||||||
Timestamp: getTimestamp(),
|
case plainEncodingType:
|
||||||
Level: level,
|
writePlainText(writer, level, msg)
|
||||||
Content: msg,
|
default:
|
||||||
|
info := logEntry{
|
||||||
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Content: msg,
|
||||||
|
}
|
||||||
|
outputJson(writer, info)
|
||||||
}
|
}
|
||||||
outputJson(writer, info)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func outputError(writer io.Writer, msg string, callDepth int) {
|
func outputError(writer io.Writer, msg string, callDepth int) {
|
||||||
@@ -565,6 +592,62 @@ func statSync(msg string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writePlainAny(writer io.Writer, level string, val interface{}, fields ...string) {
|
||||||
|
switch v := val.(type) {
|
||||||
|
case string:
|
||||||
|
writePlainText(writer, level, v, fields...)
|
||||||
|
case error:
|
||||||
|
writePlainText(writer, level, v.Error(), fields...)
|
||||||
|
case fmt.Stringer:
|
||||||
|
writePlainText(writer, level, v.String(), fields...)
|
||||||
|
default:
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString(getTimestamp())
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(level)
|
||||||
|
for _, item := range fields {
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(item)
|
||||||
|
}
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
if err := json.NewEncoder(&buf).Encode(val); err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
|
||||||
|
log.Println(buf.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := writer.Write(buf.Bytes()); err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writePlainText(writer io.Writer, level, msg string, fields ...string) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString(getTimestamp())
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(level)
|
||||||
|
for _, item := range fields {
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(item)
|
||||||
|
}
|
||||||
|
buf.WriteByte(plainEncodingSep)
|
||||||
|
buf.WriteString(msg)
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
|
||||||
|
log.Println(buf.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := writer.Write(buf.Bytes()); err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type logWriter struct {
|
type logWriter struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -141,6 +141,78 @@ func TestStructedLogInfov(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAny(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := encoding
|
||||||
|
encoding = plainEncodingType
|
||||||
|
defer func() {
|
||||||
|
encoding = old
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(v)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAnyString(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := encoding
|
||||||
|
encoding = plainEncodingType
|
||||||
|
defer func() {
|
||||||
|
encoding = old
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(fmt.Sprint(v...))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAnyError(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := encoding
|
||||||
|
encoding = plainEncodingType
|
||||||
|
defer func() {
|
||||||
|
encoding = old
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(errors.New(fmt.Sprint(v...)))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleAnyStringer(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := encoding
|
||||||
|
encoding = plainEncodingType
|
||||||
|
defer func() {
|
||||||
|
encoding = old
|
||||||
|
}()
|
||||||
|
|
||||||
|
Infov(ValStringer{
|
||||||
|
val: fmt.Sprint(v...),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructedLogInfoConsoleText(t *testing.T) {
|
||||||
|
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
|
||||||
|
infoLog = writer
|
||||||
|
}, func(v ...interface{}) {
|
||||||
|
old := encoding
|
||||||
|
encoding = plainEncodingType
|
||||||
|
defer func() {
|
||||||
|
encoding = old
|
||||||
|
}()
|
||||||
|
|
||||||
|
Info(fmt.Sprint(v...))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestStructedLogSlow(t *testing.T) {
|
func TestStructedLogSlow(t *testing.T) {
|
||||||
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
|
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
|
||||||
slowLog = writer
|
slowLog = writer
|
||||||
@@ -432,6 +504,17 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
|
|||||||
assert.True(t, strings.Contains(val, message))
|
assert.True(t, strings.Contains(val, message))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func doTestStructedLogConsole(t *testing.T, setup func(writer io.WriteCloser),
|
||||||
|
write func(...interface{})) {
|
||||||
|
const message = "hello there"
|
||||||
|
writer := new(mockWriter)
|
||||||
|
setup(writer)
|
||||||
|
atomic.StoreUint32(&initialized, 1)
|
||||||
|
write(message)
|
||||||
|
println(writer.String())
|
||||||
|
assert.True(t, strings.Contains(writer.String(), message))
|
||||||
|
}
|
||||||
|
|
||||||
func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
||||||
SetUp(LogConf{
|
SetUp(LogConf{
|
||||||
Mode: mode,
|
Mode: mode,
|
||||||
@@ -456,3 +539,11 @@ func testSetLevelTwiceWithMode(t *testing.T, mode string) {
|
|||||||
ErrorStackf(message)
|
ErrorStackf(message)
|
||||||
assert.Equal(t, 0, writer.builder.Len())
|
assert.Equal(t, 0, writer.builder.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ValStringer struct {
|
||||||
|
val string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v ValStringer) String() string {
|
||||||
|
return v.val
|
||||||
|
}
|
||||||
|
|||||||
@@ -13,9 +13,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/fs"
|
"github.com/zeromicro/go-zero/core/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDailyRotateRuleMarkRotated(t *testing.T) {
|
func TestDailyRotateRuleMarkRotated(t *testing.T) {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -77,16 +77,24 @@ func (l *traceLogger) WithDuration(duration time.Duration) Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
|
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
|
||||||
outputJson(writer, &traceLogger{
|
traceID := traceIdFromContext(l.ctx)
|
||||||
logEntry: logEntry{
|
spanID := spanIdFromContext(l.ctx)
|
||||||
Timestamp: getTimestamp(),
|
|
||||||
Level: level,
|
switch encoding {
|
||||||
Duration: l.Duration,
|
case plainEncodingType:
|
||||||
Content: val,
|
writePlainAny(writer, level, val, l.Duration, traceID, spanID)
|
||||||
},
|
default:
|
||||||
Trace: traceIdFromContext(l.ctx),
|
outputJson(writer, &traceLogger{
|
||||||
Span: spanIdFromContext(l.ctx),
|
logEntry: logEntry{
|
||||||
})
|
Timestamp: getTimestamp(),
|
||||||
|
Level: level,
|
||||||
|
Duration: l.Duration,
|
||||||
|
Content: val,
|
||||||
|
},
|
||||||
|
Trace: traceID,
|
||||||
|
Span: spanID,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithContext sets ctx to log, for keeping tracing information.
|
// WithContext sets ctx to log, for keeping tracing information.
|
||||||
|
|||||||
@@ -82,6 +82,37 @@ func TestTraceInfo(t *testing.T) {
|
|||||||
assert.True(t, strings.Contains(buf.String(), spanKey))
|
assert.True(t, strings.Contains(buf.String(), spanKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTraceInfoConsole(t *testing.T) {
|
||||||
|
old := encoding
|
||||||
|
encoding = plainEncodingType
|
||||||
|
defer func() {
|
||||||
|
encoding = old
|
||||||
|
}()
|
||||||
|
|
||||||
|
var buf mockWriter
|
||||||
|
atomic.StoreUint32(&initialized, 1)
|
||||||
|
infoLog = newLogWriter(log.New(&buf, "", flags))
|
||||||
|
otp := otel.GetTracerProvider()
|
||||||
|
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
|
||||||
|
otel.SetTracerProvider(tp)
|
||||||
|
defer otel.SetTracerProvider(otp)
|
||||||
|
|
||||||
|
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
|
||||||
|
l := WithContext(ctx).(*traceLogger)
|
||||||
|
SetLevel(InfoLevel)
|
||||||
|
l.WithDuration(time.Second).Info(testlog)
|
||||||
|
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||||
|
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||||
|
buf.Reset()
|
||||||
|
l.WithDuration(time.Second).Infof(testlog)
|
||||||
|
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||||
|
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||||
|
buf.Reset()
|
||||||
|
l.WithDuration(time.Second).Infov(testlog)
|
||||||
|
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
|
||||||
|
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
|
||||||
|
}
|
||||||
|
|
||||||
func TestTraceSlow(t *testing.T) {
|
func TestTraceSlow(t *testing.T) {
|
||||||
var buf mockWriter
|
var buf mockWriter
|
||||||
atomic.StoreUint32(&initialized, 1)
|
atomic.StoreUint32(&initialized, 1)
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package mapping
|
|||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/jsonx"
|
"github.com/zeromicro/go-zero/core/jsonx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const jsonTagKey = "json"
|
const jsonTagKey = "json"
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/jsonx"
|
"github.com/zeromicro/go-zero/core/jsonx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -742,7 +742,9 @@ func getValueWithChainedKeys(m Valuer, keys []string) (interface{}, bool) {
|
|||||||
if len(keys) == 1 {
|
if len(keys) == 1 {
|
||||||
v, ok := m.Value(keys[0])
|
v, ok := m.Value(keys[0])
|
||||||
return v, ok
|
return v, ok
|
||||||
} else if len(keys) > 1 {
|
}
|
||||||
|
|
||||||
|
if len(keys) > 1 {
|
||||||
if v, ok := m.Value(keys[0]); ok {
|
if v, ok := m.Value(keys[0]); ok {
|
||||||
if nextm, ok := v.(map[string]interface{}); ok {
|
if nextm, ok := v.(map[string]interface{}); ok {
|
||||||
return getValueWithChainedKeys(MapValuer(nextm), keys[1:])
|
return getValueWithChainedKeys(MapValuer(nextm), keys[1:])
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// because json.Number doesn't support strconv.ParseUint(...),
|
// because json.Number doesn't support strconv.ParseUint(...),
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMaxInt(t *testing.T) {
|
func TestMaxInt(t *testing.T) {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package metric
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
prom "github.com/prometheus/client_golang/prometheus"
|
prom "github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package metric
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
prom "github.com/prometheus/client_golang/prometheus"
|
prom "github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package metric
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
prom "github.com/prometheus/client_golang/prometheus"
|
prom "github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/tal-tech/go-zero/core/proc"
|
"github.com/zeromicro/go-zero/core/proc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -3,12 +3,11 @@ package mr
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/errorx"
|
"github.com/zeromicro/go-zero/core/errorx"
|
||||||
"github.com/tal-tech/go-zero/core/lang"
|
"github.com/zeromicro/go-zero/core/lang"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -24,12 +23,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
// ForEachFunc is used to do element processing, but no output.
|
||||||
|
ForEachFunc func(item interface{})
|
||||||
// GenerateFunc is used to let callers send elements into source.
|
// GenerateFunc is used to let callers send elements into source.
|
||||||
GenerateFunc func(source chan<- interface{})
|
GenerateFunc func(source chan<- interface{})
|
||||||
// MapFunc is used to do element processing and write the output to writer.
|
// MapFunc is used to do element processing and write the output to writer.
|
||||||
MapFunc func(item interface{}, writer Writer)
|
MapFunc func(item interface{}, writer Writer)
|
||||||
// VoidMapFunc is used to do element processing, but no output.
|
|
||||||
VoidMapFunc func(item interface{})
|
|
||||||
// MapperFunc is used to do element processing and write the output to writer,
|
// MapperFunc is used to do element processing and write the output to writer,
|
||||||
// use cancel func to cancel the processing.
|
// use cancel func to cancel the processing.
|
||||||
MapperFunc func(item interface{}, writer Writer, cancel func(error))
|
MapperFunc func(item interface{}, writer Writer, cancel func(error))
|
||||||
@@ -42,6 +41,16 @@ type (
|
|||||||
// Option defines the method to customize the mapreduce.
|
// Option defines the method to customize the mapreduce.
|
||||||
Option func(opts *mapReduceOptions)
|
Option func(opts *mapReduceOptions)
|
||||||
|
|
||||||
|
mapperContext struct {
|
||||||
|
ctx context.Context
|
||||||
|
mapper MapFunc
|
||||||
|
source <-chan interface{}
|
||||||
|
panicChan *onceChan
|
||||||
|
collector chan<- interface{}
|
||||||
|
doneChan <-chan lang.PlaceholderType
|
||||||
|
workers int
|
||||||
|
}
|
||||||
|
|
||||||
mapReduceOptions struct {
|
mapReduceOptions struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
workers int
|
workers int
|
||||||
@@ -69,7 +78,6 @@ func Finish(fns ...func() error) error {
|
|||||||
cancel(err)
|
cancel(err)
|
||||||
}
|
}
|
||||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
drain(pipe)
|
|
||||||
}, WithWorkers(len(fns)))
|
}, WithWorkers(len(fns)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -79,7 +87,7 @@ func FinishVoid(fns ...func()) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
MapVoid(func(source chan<- interface{}) {
|
ForEach(func(source chan<- interface{}) {
|
||||||
for _, fn := range fns {
|
for _, fn := range fns {
|
||||||
source <- fn
|
source <- fn
|
||||||
}
|
}
|
||||||
@@ -89,41 +97,74 @@ func FinishVoid(fns ...func()) {
|
|||||||
}, WithWorkers(len(fns)))
|
}, WithWorkers(len(fns)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map maps all elements generated from given generate func, and returns an output channel.
|
// ForEach maps all elements from given generate but no output.
|
||||||
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
|
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
||||||
options := buildOptions(opts...)
|
options := buildOptions(opts...)
|
||||||
source := buildSource(generate)
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
|
source := buildSource(generate, panicChan)
|
||||||
collector := make(chan interface{}, options.workers)
|
collector := make(chan interface{}, options.workers)
|
||||||
done := make(chan lang.PlaceholderType)
|
done := make(chan lang.PlaceholderType)
|
||||||
|
|
||||||
go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
|
go executeMappers(mapperContext{
|
||||||
|
ctx: options.ctx,
|
||||||
|
mapper: func(item interface{}, writer Writer) {
|
||||||
|
mapper(item)
|
||||||
|
},
|
||||||
|
source: source,
|
||||||
|
panicChan: panicChan,
|
||||||
|
collector: collector,
|
||||||
|
doneChan: done,
|
||||||
|
workers: options.workers,
|
||||||
|
})
|
||||||
|
|
||||||
return collector
|
for {
|
||||||
|
select {
|
||||||
|
case v := <-panicChan.channel:
|
||||||
|
panic(v)
|
||||||
|
case _, ok := <-collector:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapReduce maps all elements generated from given generate func,
|
// MapReduce maps all elements generated from given generate func,
|
||||||
// and reduces the output elements with given reducer.
|
// and reduces the output elements with given reducer.
|
||||||
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
||||||
opts ...Option) (interface{}, error) {
|
opts ...Option) (interface{}, error) {
|
||||||
source := buildSource(generate)
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
return MapReduceWithSource(source, mapper, reducer, opts...)
|
source := buildSource(generate, panicChan)
|
||||||
|
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
|
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||||
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
|
||||||
opts ...Option) (interface{}, error) {
|
opts ...Option) (interface{}, error) {
|
||||||
|
panicChan := &onceChan{channel: make(chan interface{})}
|
||||||
|
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||||
|
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
|
||||||
|
reducer ReducerFunc, opts ...Option) (interface{}, error) {
|
||||||
options := buildOptions(opts...)
|
options := buildOptions(opts...)
|
||||||
|
// output is used to write the final result
|
||||||
output := make(chan interface{})
|
output := make(chan interface{})
|
||||||
defer func() {
|
defer func() {
|
||||||
|
// reducer can only write once, if more, panic
|
||||||
for range output {
|
for range output {
|
||||||
panic("more than one element written in reducer")
|
panic("more than one element written in reducer")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// collector is used to collect data from mapper, and consume in reducer
|
||||||
collector := make(chan interface{}, options.workers)
|
collector := make(chan interface{}, options.workers)
|
||||||
|
// if done is closed, all mappers and reducer should stop processing
|
||||||
done := make(chan lang.PlaceholderType)
|
done := make(chan lang.PlaceholderType)
|
||||||
writer := newGuardedWriter(options.ctx, output, done)
|
writer := newGuardedWriter(options.ctx, output, done)
|
||||||
var closeOnce sync.Once
|
var closeOnce sync.Once
|
||||||
|
// use atomic.Value to avoid data race
|
||||||
var retErr errorx.AtomicError
|
var retErr errorx.AtomicError
|
||||||
finish := func() {
|
finish := func() {
|
||||||
closeOnce.Do(func() {
|
closeOnce.Do(func() {
|
||||||
@@ -145,28 +186,41 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
drain(collector)
|
drain(collector)
|
||||||
|
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
cancel(fmt.Errorf("%v", r))
|
panicChan.write(r)
|
||||||
} else {
|
|
||||||
finish()
|
|
||||||
}
|
}
|
||||||
|
finish()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
reducer(collector, writer, cancel)
|
reducer(collector, writer, cancel)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go executeMappers(options.ctx, func(item interface{}, w Writer) {
|
go executeMappers(mapperContext{
|
||||||
mapper(item, w, cancel)
|
ctx: options.ctx,
|
||||||
}, source, collector, done, options.workers)
|
mapper: func(item interface{}, w Writer) {
|
||||||
|
mapper(item, w, cancel)
|
||||||
|
},
|
||||||
|
source: source,
|
||||||
|
panicChan: panicChan,
|
||||||
|
collector: collector,
|
||||||
|
doneChan: done,
|
||||||
|
workers: options.workers,
|
||||||
|
})
|
||||||
|
|
||||||
value, ok := <-output
|
select {
|
||||||
if err := retErr.Load(); err != nil {
|
case <-options.ctx.Done():
|
||||||
return nil, err
|
cancel(context.DeadlineExceeded)
|
||||||
} else if ok {
|
return nil, context.DeadlineExceeded
|
||||||
return value, nil
|
case v := <-panicChan.channel:
|
||||||
} else {
|
panic(v)
|
||||||
return nil, ErrReduceNoOutput
|
case v, ok := <-output:
|
||||||
|
if err := retErr.Load(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if ok {
|
||||||
|
return v, nil
|
||||||
|
} else {
|
||||||
|
return nil, ErrReduceNoOutput
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,18 +229,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
|
|||||||
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
||||||
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
|
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
reducer(input, cancel)
|
reducer(input, cancel)
|
||||||
// We need to write a placeholder to let MapReduce to continue on reducer done,
|
|
||||||
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
|
|
||||||
writer.Write(lang.Placeholder)
|
|
||||||
}, opts...)
|
}, opts...)
|
||||||
return err
|
if errors.Is(err, ErrReduceNoOutput) {
|
||||||
}
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// MapVoid maps all elements from given generate but no output.
|
return err
|
||||||
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
|
|
||||||
drain(Map(generate, func(item interface{}, writer Writer) {
|
|
||||||
mapper(item)
|
|
||||||
}, opts...))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithContext customizes a mapreduce processing accepts a given ctx.
|
// WithContext customizes a mapreduce processing accepts a given ctx.
|
||||||
@@ -216,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildSource(generate GenerateFunc) chan interface{} {
|
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
|
||||||
source := make(chan interface{})
|
source := make(chan interface{})
|
||||||
threading.GoSafe(func() {
|
go func() {
|
||||||
defer close(source)
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
panicChan.write(r)
|
||||||
|
}
|
||||||
|
close(source)
|
||||||
|
}()
|
||||||
|
|
||||||
generate(source)
|
generate(source)
|
||||||
})
|
}()
|
||||||
|
|
||||||
return source
|
return source
|
||||||
}
|
}
|
||||||
@@ -233,39 +287,43 @@ func drain(channel <-chan interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
|
func executeMappers(mCtx mapperContext) {
|
||||||
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
defer func() {
|
defer func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(collector)
|
close(mCtx.collector)
|
||||||
|
drain(mCtx.source)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pool := make(chan lang.PlaceholderType, workers)
|
var failed int32
|
||||||
writer := newGuardedWriter(ctx, collector, done)
|
pool := make(chan lang.PlaceholderType, mCtx.workers)
|
||||||
for {
|
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
|
||||||
|
for atomic.LoadInt32(&failed) == 0 {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-mCtx.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-done:
|
case <-mCtx.doneChan:
|
||||||
return
|
return
|
||||||
case pool <- lang.Placeholder:
|
case pool <- lang.Placeholder:
|
||||||
item, ok := <-input
|
item, ok := <-mCtx.source
|
||||||
if !ok {
|
if !ok {
|
||||||
<-pool
|
<-pool
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
// better to safely run caller defined method
|
go func() {
|
||||||
threading.GoSafe(func() {
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
atomic.AddInt32(&failed, 1)
|
||||||
|
mCtx.panicChan.write(r)
|
||||||
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
<-pool
|
<-pool
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mapper(item, writer)
|
mCtx.mapper(item, writer)
|
||||||
})
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -311,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
|
|||||||
gw.channel <- v
|
gw.channel <- v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type onceChan struct {
|
||||||
|
channel chan interface{}
|
||||||
|
wrote int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (oc *onceChan) write(val interface{}) {
|
||||||
|
if atomic.AddInt32(&oc.wrote, 1) > 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
oc.channel <- val
|
||||||
|
}
|
||||||
|
|||||||
78
core/mr/mapreduce_fuzz_test.go
Normal file
78
core/mr/mapreduce_fuzz_test.go
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
//go:build go1.18
|
||||||
|
// +build go1.18
|
||||||
|
|
||||||
|
package mr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/goleak"
|
||||||
|
)
|
||||||
|
|
||||||
|
func FuzzMapReduce(f *testing.F) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
f.Add(uint(10), uint(runtime.NumCPU()))
|
||||||
|
f.Fuzz(func(t *testing.T, num uint, workers uint) {
|
||||||
|
n := int64(num)%5000 + 5000
|
||||||
|
genPanic := rand.Intn(100) == 0
|
||||||
|
mapperPanic := rand.Intn(100) == 0
|
||||||
|
reducerPanic := rand.Intn(100) == 0
|
||||||
|
genIdx := rand.Int63n(n)
|
||||||
|
mapperIdx := rand.Int63n(n)
|
||||||
|
reducerIdx := rand.Int63n(n)
|
||||||
|
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||||
|
|
||||||
|
fn := func() (interface{}, error) {
|
||||||
|
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||||
|
|
||||||
|
return MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
source <- i
|
||||||
|
if genPanic && i == genIdx {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int64)
|
||||||
|
if mapperPanic && v == mapperIdx {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
writer.Write(v * v)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var idx int64
|
||||||
|
var total int64
|
||||||
|
for v := range pipe {
|
||||||
|
if reducerPanic && idx == reducerIdx {
|
||||||
|
panic("baz")
|
||||||
|
}
|
||||||
|
total += v.(int64)
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
writer.Write(total)
|
||||||
|
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if genPanic || mapperPanic || reducerPanic {
|
||||||
|
var buf strings.Builder
|
||||||
|
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||||
|
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||||
|
} else {
|
||||||
|
val, err := fn()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, squareSum, val.(int64))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
107
core/mr/mapreduce_rand_test.go
Normal file
107
core/mr/mapreduce_rand_test.go
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
//go:build fuzz
|
||||||
|
// +build fuzz
|
||||||
|
|
||||||
|
package mr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
|
"gopkg.in/cheggaaa/pb.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
|
||||||
|
// so we need to simulate the fuzz test in test mode.
|
||||||
|
func TestMapReduceRandom(t *testing.T) {
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
const (
|
||||||
|
times = 10000
|
||||||
|
nRange = 500
|
||||||
|
mega = 1024 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
bar := pb.New(times).Start()
|
||||||
|
runner := threading.NewTaskRunner(runtime.NumCPU())
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(times)
|
||||||
|
for i := 0; i < times; i++ {
|
||||||
|
runner.Schedule(func() {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
if time.Since(start) > time.Minute {
|
||||||
|
t.Fatal("timeout")
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||||
|
n := rand.Int63n(nRange)%nRange + nRange
|
||||||
|
workers := rand.Int()%50 + runtime.NumCPU()/2
|
||||||
|
genPanic := rand.Intn(100) == 0
|
||||||
|
mapperPanic := rand.Intn(100) == 0
|
||||||
|
reducerPanic := rand.Intn(100) == 0
|
||||||
|
genIdx := rand.Int63n(n)
|
||||||
|
mapperIdx := rand.Int63n(n)
|
||||||
|
reducerIdx := rand.Int63n(n)
|
||||||
|
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||||
|
|
||||||
|
fn := func() (interface{}, error) {
|
||||||
|
return MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := int64(0); i < n; i++ {
|
||||||
|
source <- i
|
||||||
|
if genPanic && i == genIdx {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int64)
|
||||||
|
if mapperPanic && v == mapperIdx {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
writer.Write(v * v)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var idx int64
|
||||||
|
var total int64
|
||||||
|
for v := range pipe {
|
||||||
|
if reducerPanic && idx == reducerIdx {
|
||||||
|
panic("baz")
|
||||||
|
}
|
||||||
|
total += v.(int64)
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
writer.Write(total)
|
||||||
|
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||||
|
}
|
||||||
|
|
||||||
|
if genPanic || mapperPanic || reducerPanic {
|
||||||
|
var buf strings.Builder
|
||||||
|
buf.WriteString(fmt.Sprintf("n: %d", n))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
|
||||||
|
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
|
||||||
|
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
|
||||||
|
assert.Panicsf(t, func() { fn() }, buf.String())
|
||||||
|
} else {
|
||||||
|
val, err := fn()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, squareSum, val.(int64))
|
||||||
|
}
|
||||||
|
bar.Increment()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
bar.Finish()
|
||||||
|
}
|
||||||
@@ -11,8 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/stringx"
|
"go.uber.org/goleak"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var errDummy = errors.New("dummy")
|
var errDummy = errors.New("dummy")
|
||||||
@@ -22,6 +21,8 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFinish(t *testing.T) {
|
func TestFinish(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var total uint32
|
var total uint32
|
||||||
err := Finish(func() error {
|
err := Finish(func() error {
|
||||||
atomic.AddUint32(&total, 2)
|
atomic.AddUint32(&total, 2)
|
||||||
@@ -39,14 +40,20 @@ func TestFinish(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishNone(t *testing.T) {
|
func TestFinishNone(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
assert.Nil(t, Finish())
|
assert.Nil(t, Finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishVoidNone(t *testing.T) {
|
func TestFinishVoidNone(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
FinishVoid()
|
FinishVoid()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishErr(t *testing.T) {
|
func TestFinishErr(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var total uint32
|
var total uint32
|
||||||
err := Finish(func() error {
|
err := Finish(func() error {
|
||||||
atomic.AddUint32(&total, 2)
|
atomic.AddUint32(&total, 2)
|
||||||
@@ -63,6 +70,8 @@ func TestFinishErr(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFinishVoid(t *testing.T) {
|
func TestFinishVoid(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var total uint32
|
var total uint32
|
||||||
FinishVoid(func() {
|
FinishVoid(func() {
|
||||||
atomic.AddUint32(&total, 2)
|
atomic.AddUint32(&total, 2)
|
||||||
@@ -75,70 +84,107 @@ func TestFinishVoid(t *testing.T) {
|
|||||||
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
|
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMap(t *testing.T) {
|
func TestForEach(t *testing.T) {
|
||||||
tests := []struct {
|
const tasks = 1000
|
||||||
mapper MapFunc
|
|
||||||
expect int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 30,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
if v%2 == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
mapper: func(item interface{}, writer Writer) {
|
|
||||||
v := item.(int)
|
|
||||||
if v%2 == 0 {
|
|
||||||
panic(v)
|
|
||||||
}
|
|
||||||
writer.Write(v * v)
|
|
||||||
},
|
|
||||||
expect: 10,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
t.Run("all", func(t *testing.T) {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
defer goleak.VerifyNone(t)
|
||||||
channel := Map(func(source chan<- interface{}) {
|
|
||||||
for i := 1; i < 5; i++ {
|
var count uint32
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}) {
|
||||||
|
atomic.AddUint32(&count, 1)
|
||||||
|
}, WithWorkers(-1))
|
||||||
|
|
||||||
|
assert.Equal(t, tasks, int(count))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("odd", func(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var count uint32
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}) {
|
||||||
|
if item.(int)%2 == 0 {
|
||||||
|
atomic.AddUint32(&count, 1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(t, tasks/2, int(count))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("all", func(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
}, test.mapper, WithWorkers(-1))
|
}, func(item interface{}) {
|
||||||
|
panic("foo")
|
||||||
var result int
|
})
|
||||||
for v := range channel {
|
|
||||||
result += v.(int)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, test.expect, result)
|
|
||||||
})
|
})
|
||||||
}
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGeneratePanic(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
t.Run("all", func(t *testing.T) {
|
||||||
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
|
panic("foo")
|
||||||
|
}, func(item interface{}) {
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapperPanic(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
const tasks = 1000
|
||||||
|
var run int32
|
||||||
|
t.Run("all", func(t *testing.T) {
|
||||||
|
assert.PanicsWithValue(t, "foo", func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < tasks; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
atomic.AddInt32(&run, 1)
|
||||||
|
panic("foo")
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
})
|
||||||
|
})
|
||||||
|
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduce(t *testing.T) {
|
func TestMapReduce(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
name string
|
||||||
mapper MapperFunc
|
mapper MapperFunc
|
||||||
reducer ReducerFunc
|
reducer ReducerFunc
|
||||||
expectErr error
|
expectErr error
|
||||||
expectValue interface{}
|
expectValue interface{}
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "simple",
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectValue: 30,
|
expectValue: 30,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with error",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -149,6 +195,7 @@ func TestMapReduce(t *testing.T) {
|
|||||||
expectErr: errDummy,
|
expectErr: errDummy,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with nil",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -160,6 +207,7 @@ func TestMapReduce(t *testing.T) {
|
|||||||
expectValue: nil,
|
expectValue: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with more",
|
||||||
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
var result int
|
var result int
|
||||||
for item := range pipe {
|
for item := range pipe {
|
||||||
@@ -174,36 +222,74 @@ func TestMapReduce(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
t.Run("MapReduce", func(t *testing.T) {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
for _, test := range tests {
|
||||||
if test.mapper == nil {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
if test.mapper == nil {
|
||||||
v := item.(int)
|
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
writer.Write(v * v)
|
v := item.(int)
|
||||||
}
|
writer.Write(v * v)
|
||||||
}
|
|
||||||
if test.reducer == nil {
|
|
||||||
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
|
||||||
var result int
|
|
||||||
for item := range pipe {
|
|
||||||
result += item.(int)
|
|
||||||
}
|
}
|
||||||
writer.Write(result)
|
|
||||||
}
|
}
|
||||||
}
|
if test.reducer == nil {
|
||||||
value, err := MapReduce(func(source chan<- interface{}) {
|
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
for i := 1; i < 5; i++ {
|
var result int
|
||||||
source <- i
|
for item := range pipe {
|
||||||
|
result += item.(int)
|
||||||
|
}
|
||||||
|
writer.Write(result)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
value, err := MapReduce(func(source chan<- interface{}) {
|
||||||
|
for i := 1; i < 5; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
|
||||||
|
|
||||||
assert.Equal(t, test.expectErr, err)
|
assert.Equal(t, test.expectErr, err)
|
||||||
assert.Equal(t, test.expectValue, value)
|
assert.Equal(t, test.expectValue, value)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("MapReduce", func(t *testing.T) {
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
if test.mapper == nil {
|
||||||
|
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
v := item.(int)
|
||||||
|
writer.Write(v * v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if test.reducer == nil {
|
||||||
|
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
var result int
|
||||||
|
for item := range pipe {
|
||||||
|
result += item.(int)
|
||||||
|
}
|
||||||
|
writer.Write(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
source := make(chan interface{})
|
||||||
|
go func() {
|
||||||
|
for i := 1; i < 5; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
close(source)
|
||||||
|
}()
|
||||||
|
|
||||||
|
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
|
||||||
|
assert.Equal(t, test.expectErr, err)
|
||||||
|
assert.Equal(t, test.expectValue, value)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
assert.Panics(t, func() {
|
assert.Panics(t, func() {
|
||||||
MapReduce(func(source chan<- interface{}) {
|
MapReduce(func(source chan<- interface{}) {
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@@ -220,18 +306,23 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoid(t *testing.T) {
|
func TestMapReduceVoid(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var value uint32
|
var value uint32
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
name string
|
||||||
mapper MapperFunc
|
mapper MapperFunc
|
||||||
reducer VoidReducerFunc
|
reducer VoidReducerFunc
|
||||||
expectValue uint32
|
expectValue uint32
|
||||||
expectErr error
|
expectErr error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
name: "simple",
|
||||||
expectValue: 30,
|
expectValue: 30,
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with error",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -242,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
expectErr: errDummy,
|
expectErr: errDummy,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with nil",
|
||||||
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
mapper: func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
v := item.(int)
|
v := item.(int)
|
||||||
if v%3 == 0 {
|
if v%3 == 0 {
|
||||||
@@ -252,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
expectErr: ErrCancelWithNil,
|
expectErr: ErrCancelWithNil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
name: "cancel with more",
|
||||||
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
reducer: func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
for item := range pipe {
|
for item := range pipe {
|
||||||
result := atomic.AddUint32(&value, uint32(item.(int)))
|
result := atomic.AddUint32(&value, uint32(item.(int)))
|
||||||
@@ -265,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(stringx.Rand(), func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
atomic.StoreUint32(&value, 0)
|
atomic.StoreUint32(&value, 0)
|
||||||
|
|
||||||
if test.mapper == nil {
|
if test.mapper == nil {
|
||||||
@@ -296,6 +389,8 @@ func TestMapReduceVoid(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidWithDelay(t *testing.T) {
|
func TestMapReduceVoidWithDelay(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var result []int
|
var result []int
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
source <- 0
|
source <- 0
|
||||||
@@ -318,38 +413,64 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
|
|||||||
assert.Equal(t, 0, result[1])
|
assert.Equal(t, 0, result[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapVoid(t *testing.T) {
|
func TestMapReducePanic(t *testing.T) {
|
||||||
const tasks = 1000
|
defer goleak.VerifyNone(t)
|
||||||
var count uint32
|
|
||||||
MapVoid(func(source chan<- interface{}) {
|
|
||||||
for i := 0; i < tasks; i++ {
|
|
||||||
source <- i
|
|
||||||
}
|
|
||||||
}, func(item interface{}) {
|
|
||||||
atomic.AddUint32(&count, 1)
|
|
||||||
})
|
|
||||||
|
|
||||||
assert.Equal(t, tasks, int(count))
|
assert.Panics(t, func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
source <- 0
|
||||||
|
source <- 1
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
i := item.(int)
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
for range pipe {
|
||||||
|
panic("panic")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReducePanic(t *testing.T) {
|
func TestMapReducePanicOnce(t *testing.T) {
|
||||||
v, err := MapReduce(func(source chan<- interface{}) {
|
defer goleak.VerifyNone(t)
|
||||||
source <- 0
|
|
||||||
source <- 1
|
assert.Panics(t, func() {
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
i := item.(int)
|
for i := 0; i < 100; i++ {
|
||||||
writer.Write(i)
|
source <- i
|
||||||
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
}
|
||||||
for range pipe {
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
panic("panic")
|
i := item.(int)
|
||||||
}
|
if i == 0 {
|
||||||
|
panic("foo")
|
||||||
|
}
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
for range pipe {
|
||||||
|
panic("bar")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
assert.Panics(t, func() {
|
||||||
|
_, _ = MapReduce(func(source chan<- interface{}) {
|
||||||
|
source <- 0
|
||||||
|
source <- 1
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
panic("foo")
|
||||||
|
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
|
||||||
|
panic("bar")
|
||||||
|
})
|
||||||
})
|
})
|
||||||
assert.Nil(t, v)
|
|
||||||
assert.NotNil(t, err)
|
|
||||||
assert.Equal(t, "panic", err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidCancel(t *testing.T) {
|
func TestMapReduceVoidCancel(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
var result []int
|
var result []int
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
source <- 0
|
source <- 0
|
||||||
@@ -371,13 +492,15 @@ func TestMapReduceVoidCancel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||||
var done syncx.AtomicBool
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var done int32
|
||||||
var result []int
|
var result []int
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
if i == defaultWorkers/2 {
|
if i == defaultWorkers/2 {
|
||||||
@@ -392,10 +515,12 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
assert.Equal(t, "anything", err.Error())
|
assert.Equal(t, "anything", err.Error())
|
||||||
assert.True(t, done.True())
|
assert.Equal(t, int32(1), done)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
uids := []int{1, 2, 3}
|
uids := []int{1, 2, 3}
|
||||||
res, err := MapReduce(func(source chan<- interface{}) {
|
res, err := MapReduce(func(source chan<- interface{}) {
|
||||||
for _, uid := range uids {
|
for _, uid := range uids {
|
||||||
@@ -412,33 +537,54 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
const message = "foo"
|
const message = "foo"
|
||||||
var done syncx.AtomicBool
|
assert.Panics(t, func() {
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
var done int32
|
||||||
|
_ = MapReduceVoid(func(source chan<- interface{}) {
|
||||||
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
|
source <- i
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&done, 1)
|
||||||
|
}, func(item interface{}, writer Writer, cancel func(error)) {
|
||||||
|
i := item.(int)
|
||||||
|
writer.Write(i)
|
||||||
|
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||||
|
panic(message)
|
||||||
|
}, WithWorkers(1))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForEachWithContext(t *testing.T) {
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var done int32
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
ForEach(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, cancel func(error)) {
|
}, func(item interface{}) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
writer.Write(i)
|
if i == defaultWorkers/2 {
|
||||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
cancel()
|
||||||
panic(message)
|
}
|
||||||
}, WithWorkers(1))
|
}, WithContext(ctx))
|
||||||
assert.NotNil(t, err)
|
|
||||||
assert.Equal(t, message, err.Error())
|
|
||||||
assert.True(t, done.True())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMapReduceWithContext(t *testing.T) {
|
func TestMapReduceWithContext(t *testing.T) {
|
||||||
var done syncx.AtomicBool
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
var done int32
|
||||||
var result []int
|
var result []int
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
err := MapReduceVoid(func(source chan<- interface{}) {
|
err := MapReduceVoid(func(source chan<- interface{}) {
|
||||||
for i := 0; i < defaultWorkers*2; i++ {
|
for i := 0; i < defaultWorkers*2; i++ {
|
||||||
source <- i
|
source <- i
|
||||||
}
|
}
|
||||||
done.Set(true)
|
atomic.AddInt32(&done, 1)
|
||||||
}, func(item interface{}, writer Writer, c func(error)) {
|
}, func(item interface{}, writer Writer, c func(error)) {
|
||||||
i := item.(int)
|
i := item.(int)
|
||||||
if i == defaultWorkers/2 {
|
if i == defaultWorkers/2 {
|
||||||
@@ -452,7 +598,7 @@ func TestMapReduceWithContext(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}, WithContext(ctx))
|
}, WithContext(ctx))
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
assert.Equal(t, ErrReduceNoOutput, err)
|
assert.Equal(t, context.DeadlineExceeded, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkMapReduce(b *testing.B) {
|
func BenchmarkMapReduce(b *testing.B) {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultMemProfileRate is the default memory profiling rate.
|
// DefaultMemProfileRate is the default memory profiling rate.
|
||||||
|
|||||||
@@ -10,8 +10,8 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
const timeFormat = "0102150405"
|
const timeFormat = "0102150405"
|
||||||
|
|||||||
@@ -8,8 +8,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/olekukonko/tablewriter"
|
"github.com/olekukonko/tablewriter"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package prof
|
package prof
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/utils"
|
import "github.com/zeromicro/go-zero/core/utils"
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// A ProfilePoint is a profile time point.
|
// A ProfilePoint is a profile time point.
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package prof
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/utils"
|
"github.com/zeromicro/go-zero/core/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProfiler(t *testing.T) {
|
func TestProfiler(t *testing.T) {
|
||||||
|
|||||||
30
core/prof/runtime.go
Normal file
30
core/prof/runtime.go
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
package prof
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultInterval = time.Second * 5
|
||||||
|
mega = 1024 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
func DisplayStats(interval ...time.Duration) {
|
||||||
|
duration := defaultInterval
|
||||||
|
for _, val := range interval {
|
||||||
|
duration = val
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(duration)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
var m runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&m)
|
||||||
|
fmt.Printf("Goroutines: %d, Alloc: %vm, TotalAlloc: %vm, Sys: %vm, NumGC: %v\n",
|
||||||
|
runtime.NumGoroutine(), m.Alloc/mega, m.TotalAlloc/mega, m.Sys/mega, m.NumGC)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
@@ -6,9 +6,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/syncx"
|
"github.com/zeromicro/go-zero/core/syncx"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrNoAvailablePusher indicates no pusher available.
|
// ErrNoAvailablePusher indicates no pusher available.
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package queue
|
package queue
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/errorx"
|
import "github.com/zeromicro/go-zero/core/errorx"
|
||||||
|
|
||||||
// A MultiPusher is a pusher that can push messages to multiple underlying pushers.
|
// A MultiPusher is a pusher that can push messages to multiple underlying pushers.
|
||||||
type MultiPusher struct {
|
type MultiPusher struct {
|
||||||
|
|||||||
@@ -6,11 +6,11 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/rescue"
|
"github.com/zeromicro/go-zero/core/rescue"
|
||||||
"github.com/tal-tech/go-zero/core/stat"
|
"github.com/zeromicro/go-zero/core/stat"
|
||||||
"github.com/tal-tech/go-zero/core/threading"
|
"github.com/zeromicro/go-zero/core/threading"
|
||||||
"github.com/tal-tech/go-zero/core/timex"
|
"github.com/zeromicro/go-zero/core/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
const queueName = "queue"
|
const queueName = "queue"
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"github.com/tal-tech/go-zero/core/mathx"
|
"github.com/zeromicro/go-zero/core/mathx"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package rescue
|
package rescue
|
||||||
|
|
||||||
import "github.com/tal-tech/go-zero/core/logx"
|
import "github.com/zeromicro/go-zero/core/logx"
|
||||||
|
|
||||||
// Recover is used with defer to do cleanup on panics.
|
// Recover is used with defer to do cleanup on panics.
|
||||||
// Use it like:
|
// Use it like:
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user