Compare commits

...

41 Commits

Author SHA1 Message Date
Kevin Wan
7727d70634 chore: update goctl version (#1497) 2022-02-01 09:50:26 +08:00
Kevin Wan
5f9d101bc6 feat: add runtime stats monitor (#1496) 2022-02-01 01:34:25 +08:00
Kevin Wan
6c2abe7474 fix: goroutine stuck on edge case (#1495)
* fix: goroutine stuck on edge case

* refactor: simplify mapreduce implementation
2022-01-30 13:09:21 +08:00
Kevin Wan
14a902c1a7 feat: handling panic in mapreduce, panic in calling goroutine, not inside goroutines (#1490)
* feat: handle panic

* chore: update fuzz test

* chore: optimize square sum algorithm
2022-01-28 10:59:41 +08:00
Kevin Wan
5ad6a6d229 Update readme-cn.md
add slogan
2022-01-27 17:16:30 +08:00
Kevin Wan
6f4b97864a chore: improve migrate confirmation (#1488) 2022-01-27 11:30:35 +08:00
Kevin Wan
0e0abc3a95 chore: update warning message (#1487) 2022-01-26 23:47:57 +08:00
anqiansong
696fda1db4 patch: goctl migrate (#1485)
* * Add signal check
* Add deprecated pkg check

* fix typo `replacementBuilderx`

* output to console if verbose

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-26 23:24:25 +08:00
Kevin Wan
c1d2634427 chore: update go version for goctl (#1484) 2022-01-26 14:27:43 +08:00
Kevin Wan
4b7a680ac5 refactor: rename from tal-tech to zeromicro for goctl (#1481) 2022-01-25 23:15:07 +08:00
Kevin Wan
b3e7d2901f Feature/trie ac automation (#1479)
* fix: trie ac automation issues

* fix: trie ac automation issues

* fix: trie ac automation issues

* fix: trie ac automation issues
2022-01-25 11:14:56 +08:00
anqiansong
cdf7ec213c fix #1468 (#1478)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-24 22:23:20 +08:00
Kevin Wan
f1102fb262 chore: optimize string search with Aho–Corasick algorithm (#1476)
* chore: optimize string search with Aho–Corasick algorithm

* chore: optimize keywords replacer

* fix: replacer bugs

* chore: reorder members
2022-01-23 23:37:02 +08:00
Keqi Huang
09d1fad6e0 Polish the words in readme.md (#1475) 2022-01-22 12:20:11 +08:00
Kevin Wan
379c65a3ef docs: add go-zero users (#1473) 2022-01-20 22:36:17 +08:00
Kevin Wan
fdc7f64d6f chore: update unauthorized callback calling order (#1469)
* chore: update unauthorized callback calling order

* chore: add comments
2022-01-20 21:09:45 +08:00
anqiansong
df0f8ed59e Fix/issue#1289 (#1460)
* fix #1289

* Add unit test case

* fix `jwtTransKey`

* fix `jwtTransKey`

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-18 11:52:30 +08:00
anqiansong
c903966fc7 patch: save missing templates to disk (#1463)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-18 10:45:05 +08:00
anqiansong
e57fa8ff53 Fix/issue#1447 (#1458)
* Add data for template to render

* fix #1447

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-18 10:36:38 +08:00
Kevin Wan
bf2feee5b7 feat: implement console plain output for debug logs (#1456)
* feat: implement console plain output for debug logs

* chore: rename console encoding to plain

* chore: refactor names
2022-01-17 12:43:15 +08:00
Letian Jiang
ce05c429fc chore: check interface satisfaction w/o allocating new variable (#1454) 2022-01-16 23:34:42 +08:00
Kevin Wan
272a3f347d chore: remove jwt deprecated (#1452) 2022-01-16 10:34:44 +08:00
shenbaise9527
13db7a1931 feat: 支持redis的LTrim方法 (#1443) 2022-01-16 10:27:34 +08:00
Kevin Wan
468c237189 chore: upgrade dependencies (#1444)
* chore: upgrade dependencies

* ci: upgrade go to 1.15
2022-01-14 11:01:02 +08:00
Kevin Wan
b9b80c068b ci: add translator action (#1441) 2022-01-12 17:57:39 +08:00
anqiansong
9b592b3dee Feature rpc protoc (#1251)
* code generation by protoc

* generate pb by protoc direct

* support: grpc code generation by protoc directly

* format code

* check --go_out & --go-grpc_out

* fix typo

* Update version

* fix typo

* optimize: remove deprecated unit test

* format code

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-11 20:34:25 +08:00
Kevin Wan
2203809e5e chore: fix typo (#1437) 2022-01-11 20:23:59 +08:00
Kevin Wan
8d6d37f71e remove unnecessary drain, fix data race (#1435)
* remove unnecessary drain, fix data race

* chore: fix parameter order

* refactor: rename MapVoid to ForEach in mr
2022-01-11 16:17:51 +08:00
Kevin Wan
ea4f2af67f fix: mr goroutine leak on context deadline (#1433)
* fix: mr goroutine leak on context deadline

* test: update fx test check
2022-01-10 22:06:10 +08:00
Kevin Wan
53af194ef9 chore: refactor periodlimit (#1428)
* chore: refactor periodlimit

* chore: add comments
2022-01-09 16:22:34 +08:00
Kevin Wan
5e0e2d2b14 docs: add go-zero users (#1425) 2022-01-08 21:41:27 +08:00
Kevin Wan
74c99184c5 docs: add go-zero users (#1424) 2022-01-08 17:08:44 +08:00
Kevin Wan
eb4b86137a fix: golint issue (#1423) 2022-01-08 16:06:56 +08:00
Kevin Wan
9c4f4f3b4e update docs (#1421) 2022-01-07 12:08:45 +08:00
spectatorMrZ
240132e7c7 Fix pg model generation without tag (#1407)
1. fix pg model struct haven't tag
2. add pg command test from datasource
2022-01-07 10:45:26 +08:00
anqiansong
9d67fc4cfb feat: Add migrate (#1419)
* Add migrate

* Remove unused module

* refactor filename

* rename refactor to migrate

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-06 18:48:34 +08:00
Kevin Wan
892f93a716 docs: update install readme (#1417) 2022-01-05 12:31:49 +08:00
Kevin Wan
ba6a7c9dc8 chore: refactor rest/timeouthandler (#1415) 2022-01-05 11:17:10 +08:00
Kevin Wan
a91c3907a8 feat: rename module from tal-tech to zeromicro (#1413) 2022-01-04 15:51:32 +08:00
Kevin Wan
e267d94ee1 chore: update go-zero to v1.2.5 (#1410) 2022-01-03 21:54:53 +08:00
anqiansong
89ce5e492b refactor file|path (#1409)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-03 21:32:40 +08:00
446 changed files with 4007 additions and 1765 deletions

View File

@@ -15,7 +15,7 @@ jobs:
- name: Set up Go 1.x - name: Set up Go 1.x
uses: actions/setup-go@v2 uses: actions/setup-go@v2
with: with:
go-version: ^1.14 go-version: ^1.15
id: go id: go
- name: Check out code into the Go module directory - name: Check out code into the Go module directory

18
.github/workflows/issue-translator.yml vendored Normal file
View File

@@ -0,0 +1,18 @@
name: 'issue-translator'
on:
issue_comment:
types: [created]
issues:
types: [opened]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: tomsun28/issues-translate-action@v2.6
with:
IS_MODIFY_TITLE: true
# not require, default false, . Decide whether to modify the issue title
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑‍🤝‍🧑👫🧑🏿‍🤝‍🧑🏻👩🏾‍🤝‍👨🏿👬🏿
# not require. Customize the translation robot prefix message.

3
.gitignore vendored
View File

@@ -16,7 +16,8 @@
**/logs **/logs
# for test purpose # for test purpose
adhoc **/adhoc
**/testdata
# gitlab ci # gitlab ci
.cache .cache

View File

@@ -40,7 +40,7 @@ We will help you to contribute in different areas like filing issues, developing
getting your work reviewed and merged. getting your work reviewed and merged.
If you have questions about the development process, If you have questions about the development process,
feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose). feel free to [file an issue](https://github.com/zeromicro/go-zero/issues/new/choose).
## Find something to work on ## Find something to work on
@@ -50,10 +50,10 @@ Here is how you get started.
### Find a good first topic ### Find a good first topic
[go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue. [go-zero](https://github.com/zeromicro/go-zero) has beginner-friendly issues that provide a good first issue.
For example, [go-zero](https://github.com/tal-tech/go-zero) has For example, [go-zero](https://github.com/zeromicro/go-zero) has
[help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and [help wanted](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
[good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22) [good first issue](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
labels for issues that should not need deep knowledge of the system. labels for issues that should not need deep knowledge of the system.
We can help new contributors who wish to work on such issues. We can help new contributors who wish to work on such issues.
@@ -79,7 +79,7 @@ This is a rough outline of what a contributor's workflow looks like:
- Create a topic branch from where to base the contribution. This is usually master. - Create a topic branch from where to base the contribution. This is usually master.
- Make commits of logical units. - Make commits of logical units.
- Push changes in a topic branch to a personal fork of the repository. - Push changes in a topic branch to a personal fork of the repository.
- Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero). - Submit a pull request to [go-zero](https://github.com/zeromicro/go-zero).
## Creating Pull Requests ## Creating Pull Requests

View File

@@ -4,8 +4,8 @@ import (
"errors" "errors"
"strconv" "strconv"
"github.com/tal-tech/go-zero/core/hash" "github.com/zeromicro/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/redis"
) )
const ( const (

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stores/redis/redistest" "github.com/zeromicro/go-zero/core/stores/redis/redistest"
) )
func TestRedisBitSet_New_Set_Test(t *testing.T) { func TestRedisBitSet_New_Set_Test(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/tal-tech/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/proc" "github.com/zeromicro/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const ( const (

View File

@@ -8,7 +8,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
) )
func init() { func init() {

View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
) )
func init() { func init() {

View File

@@ -4,8 +4,8 @@ import (
"math" "math"
"time" "time"
"github.com/tal-tech/go-zero/core/collection" "github.com/zeromicro/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
) )
const ( const (

View File

@@ -7,9 +7,9 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/collection" "github.com/zeromicro/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
) )
const ( const (

View File

@@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/iox" "github.com/zeromicro/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
) )
func TestEnterToContinue(t *testing.T) { func TestEnterToContinue(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"encoding/base64" "encoding/base64"
"errors" "errors"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
// ErrPaddingSize indicates bad padding size. // ErrPaddingSize indicates bad padding size.

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
) )
const ( const (

View File

@@ -6,9 +6,9 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
) )
const ( const (

View File

@@ -4,7 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
type ( type (

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
const duration = time.Millisecond * 50 const duration = time.Millisecond * 50

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
func TestSafeMap(t *testing.T) { func TestSafeMap(t *testing.T) {

View File

@@ -1,8 +1,8 @@
package collection package collection
import ( import (
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
const ( const (

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
func init() { func init() {

View File

@@ -5,9 +5,9 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const drainWorkers = 8 const drainWorkers = 8

View File

@@ -8,10 +8,10 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const ( const (

View File

@@ -7,7 +7,7 @@ import (
"os" "os"
"path" "path"
"github.com/tal-tech/go-zero/core/mapping" "github.com/zeromicro/go-zero/core/mapping"
) )
var loaders = map[string]func([]byte, interface{}) error{ var loaders = map[string]func([]byte, interface{}) error{

View File

@@ -6,8 +6,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/hash" "github.com/zeromicro/go-zero/core/hash"
) )
func TestLoadConfig_notExists(t *testing.T) { func TestLoadConfig_notExists(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/tal-tech/go-zero/core/iox" "github.com/zeromicro/go-zero/core/iox"
) )
// PropertyError represents a configuration error message. // PropertyError represents a configuration error message.

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
) )
func TestProperties(t *testing.T) { func TestProperties(t *testing.T) {

View File

@@ -3,7 +3,7 @@ package contextx
import ( import (
"context" "context"
"github.com/tal-tech/go-zero/core/mapping" "github.com/zeromicro/go-zero/core/mapping"
) )
const contextTagKey = "ctx" const contextTagKey = "ctx"

View File

@@ -1,6 +1,6 @@
package discov package discov
import "github.com/tal-tech/go-zero/core/discov/internal" import "github.com/zeromicro/go-zero/core/discov/internal"
// RegisterAccount registers the username/password to the given etcd cluster. // RegisterAccount registers the username/password to the given etcd cluster.
func RegisterAccount(endpoints []string, user, pass string) { func RegisterAccount(endpoints []string, user, pass string) {

View File

@@ -4,8 +4,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
func TestRegisterAccount(t *testing.T) { func TestRegisterAccount(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/tal-tech/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/discov/internal"
) )
const ( const (

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/discov/internal"
) )
var mockLock sync.Mutex var mockLock sync.Mutex

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
func TestAccount(t *testing.T) { func TestAccount(t *testing.T) {

View File

@@ -9,11 +9,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/tal-tech/go-zero/core/contextx" "github.com/zeromicro/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -7,10 +7,10 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/contextx" "github.com/zeromicro/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
"go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -1,12 +1,12 @@
package discov package discov
import ( import (
"github.com/tal-tech/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc" "github.com/zeromicro/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -8,10 +8,10 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )

View File

@@ -4,9 +4,9 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/tal-tech/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
) )
type ( type (

View File

@@ -5,8 +5,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal" "github.com/zeromicro/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
const ( const (

View File

@@ -4,7 +4,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
) )
// A DelayExecutor delays a tasks on given delay interval. // A DelayExecutor delays a tasks on given delay interval.

View File

@@ -3,8 +3,8 @@ package executors
import ( import (
"time" "time"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
// A LessExecutor is an executor to limit execution once within given time interval. // A LessExecutor is an executor to limit execution once within given time interval.

View File

@@ -5,7 +5,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
func TestLessExecutor_DoOrDiscard(t *testing.T) { func TestLessExecutor_DoOrDiscard(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/proc" "github.com/zeromicro/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const idleRound = 10 const idleRound = 10

View File

@@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const threshold = 10 const threshold = 10

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
) )
const ( const (

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
) )
func TestSplitLineChunks(t *testing.T) { func TestSplitLineChunks(t *testing.T) {

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
) )
func TestRangeReader(t *testing.T) { func TestRangeReader(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"github.com/tal-tech/go-zero/core/hash" "github.com/zeromicro/go-zero/core/hash"
) )
// TempFileWithText creates the temporary file with the given content, // TempFileWithText creates the temporary file with the given content,

View File

@@ -1,6 +1,6 @@
package fx package fx
import "github.com/tal-tech/go-zero/core/threading" import "github.com/zeromicro/go-zero/core/threading"
// Parallel runs fns parallelly and waits for done. // Parallel runs fns parallelly and waits for done.
func Parallel(fns ...func()) { func Parallel(fns ...func()) {

View File

@@ -1,6 +1,6 @@
package fx package fx
import "github.com/tal-tech/go-zero/core/errorx" import "github.com/zeromicro/go-zero/core/errorx"
const defaultRetryTimes = 3 const defaultRetryTimes = 3

View File

@@ -4,9 +4,9 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/tal-tech/go-zero/core/collection" "github.com/zeromicro/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
) )
const ( const (

View File

@@ -13,7 +13,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
"go.uber.org/goleak"
) )
func TestBuffer(t *testing.T) { func TestBuffer(t *testing.T) {
@@ -563,9 +564,6 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
} }
func runCheckedTest(t *testing.T, fn func(t *testing.T)) { func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
goroutines := runtime.NumGoroutine() defer goleak.VerifyNone(t)
fn(t) fn(t)
// let scheduler schedule first
time.Sleep(time.Millisecond)
assert.True(t, runtime.NumGoroutine() <= goroutines)
} }

View File

@@ -6,8 +6,8 @@ import (
"strconv" "strconv"
"sync" "sync"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/mapping" "github.com/zeromicro/go-zero/core/mapping"
) )
const ( const (

View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
) )
const ( const (

View File

@@ -9,8 +9,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
func TestReadText(t *testing.T) { func TestReadText(t *testing.T) {

View File

@@ -5,12 +5,11 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/tal-tech/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/redis"
) )
const ( // to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key const periodScript = `local limit = tonumber(ARGV[1])
periodScript = `local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2]) local window = tonumber(ARGV[2])
local current = redis.call("INCRBY", KEYS[1], 1) local current = redis.call("INCRBY", KEYS[1], 1)
if current == 1 then if current == 1 then
@@ -23,8 +22,6 @@ elseif current == limit then
else else
return 0 return 0
end` end`
zoneDiff = 3600 * 8 // GMT+8 for our services
)
const ( const (
// Unknown means not initialized state. // Unknown means not initialized state.
@@ -104,7 +101,9 @@ func (h *PeriodLimit) Take(key string) (int, error) {
func (h *PeriodLimit) calcExpireSeconds() int { func (h *PeriodLimit) calcExpireSeconds() int {
if h.align { if h.align {
unix := time.Now().Unix() + zoneDiff now := time.Now()
_, offset := now.Zone()
unix := now.Unix() + int64(offset)
return h.period - int(unix%int64(h.period)) return h.period - int(unix%int64(h.period))
} }
@@ -112,6 +111,8 @@ func (h *PeriodLimit) calcExpireSeconds() int {
} }
// Align returns a func to customize a PeriodLimit with alignment. // Align returns a func to customize a PeriodLimit with alignment.
// For example, if we want to limit end users with 5 sms verification messages every day,
// we need to align with the local timezone and the start of the day.
func Align() PeriodOption { func Align() PeriodOption {
return func(l *PeriodLimit) { return func(l *PeriodLimit) {
l.align = true l.align = true

View File

@@ -5,8 +5,8 @@ import (
"github.com/alicebob/miniredis/v2" "github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/stores/redis/redistest" "github.com/zeromicro/go-zero/core/stores/redis/redistest"
) )
func TestPeriodLimit_Take(t *testing.T) { func TestPeriodLimit_Take(t *testing.T) {

View File

@@ -7,8 +7,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/redis"
xrate "golang.org/x/time/rate" xrate "golang.org/x/time/rate"
) )
@@ -85,8 +85,8 @@ func (lim *TokenLimiter) Allow() bool {
} }
// AllowN reports whether n events may happen at time now. // AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate rate. // Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise use Reserve or Wait. // Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool { func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n) return lim.reserveN(now, n)
} }
@@ -112,7 +112,8 @@ func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
// Lua boolean false -> r Nil bulk reply // Lua boolean false -> r Nil bulk reply
if err == redis.Nil { if err == redis.Nil {
return false return false
} else if err != nil { }
if err != nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err) logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
lim.startMonitor() lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n) return lim.rescueLimiter.AllowN(now, n)

View File

@@ -6,9 +6,9 @@ import (
"github.com/alicebob/miniredis/v2" "github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/stores/redis/redistest" "github.com/zeromicro/go-zero/core/stores/redis/redistest"
) )
func init() { func init() {

View File

@@ -7,11 +7,11 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/collection" "github.com/zeromicro/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const ( const (

View File

@@ -8,11 +8,11 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/collection" "github.com/zeromicro/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
) )
const ( const (

View File

@@ -3,7 +3,7 @@ package load
import ( import (
"io" "io"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
) )
// A ShedderGroup is a manager to manage key based shedders. // A ShedderGroup is a manager to manage key based shedders.

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
) )
type ( type (

View File

@@ -3,10 +3,11 @@ package logx
// A LogConf is a logging config. // A LogConf is a logging config.
type LogConf struct { type LogConf struct {
ServiceName string `json:",optional"` ServiceName string `json:",optional"`
Mode string `json:",default=console,options=console|file|volume"` Mode string `json:",default=console,options=[console,file,volume]"`
Encoding string `json:",default=json,options=[json,plain]"`
TimeFormat string `json:",optional"` TimeFormat string `json:",optional"`
Path string `json:",default=logs"` Path string `json:",default=logs"`
Level string `json:",default=info,options=info|error|severe"` Level string `json:",default=info,options=[info,error,severe]"`
Compress bool `json:",optional"` Compress bool `json:",optional"`
KeepDays int `json:",optional"` KeepDays int `json:",optional"`
StackCooldownMillis int `json:",default=100"` StackCooldownMillis int `json:",default=100"`

View File

@@ -5,7 +5,7 @@ import (
"io" "io"
"time" "time"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const durationCallerDepth = 3 const durationCallerDepth = 3
@@ -79,10 +79,15 @@ func (l *durationLogger) WithDuration(duration time.Duration) Logger {
} }
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) { func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
outputJson(writer, &durationLogger{ switch encoding {
Timestamp: getTimestamp(), case plainEncodingType:
Level: level, writePlainAny(writer, level, val, l.Duration)
Content: val, default:
Duration: l.Duration, outputJson(writer, &durationLogger{
}) Timestamp: getTimestamp(),
Level: level,
Content: val,
Duration: l.Duration,
})
}
} }

View File

@@ -37,6 +37,19 @@ func TestWithDurationInfo(t *testing.T) {
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String()) assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
} }
func TestWithDurationInfoConsole(t *testing.T) {
old := encoding
encoding = plainEncodingType
defer func() {
encoding = old
}()
var builder strings.Builder
log.SetOutput(&builder)
WithDuration(time.Second).Info("foo")
assert.True(t, strings.Contains(builder.String(), "ms"), builder.String())
}
func TestWithDurationInfof(t *testing.T) { func TestWithDurationInfof(t *testing.T) {
var builder strings.Builder var builder strings.Builder
log.SetOutput(&builder) log.SetOutput(&builder)

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
type limitedExecutor struct { type limitedExecutor struct {

View File

@@ -6,7 +6,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
func TestLimitedExecutor_logOrDiscard(t *testing.T) { func TestLimitedExecutor_logOrDiscard(t *testing.T) {

View File

@@ -1,6 +1,7 @@
package logx package logx
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@@ -17,9 +18,9 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/iox" "github.com/zeromicro/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/sysx" "github.com/zeromicro/go-zero/core/sysx"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const ( const (
@@ -31,6 +32,15 @@ const (
SevereLevel SevereLevel
) )
const (
jsonEncodingType = iota
plainEncodingType
jsonEncoding = "json"
plainEncoding = "plain"
plainEncodingSep = '\t'
)
const ( const (
accessFilename = "access.log" accessFilename = "access.log"
errorFilename = "error.log" errorFilename = "error.log"
@@ -65,6 +75,7 @@ var (
timeFormat = "2006-01-02T15:04:05.000Z07" timeFormat = "2006-01-02T15:04:05.000Z07"
writeConsole bool writeConsole bool
logLevel uint32 logLevel uint32
encoding = jsonEncodingType
// use uint32 for atomic operations // use uint32 for atomic operations
disableStat uint32 disableStat uint32
infoLog io.WriteCloser infoLog io.WriteCloser
@@ -124,6 +135,12 @@ func SetUp(c LogConf) error {
if len(c.TimeFormat) > 0 { if len(c.TimeFormat) > 0 {
timeFormat = c.TimeFormat timeFormat = c.TimeFormat
} }
switch c.Encoding {
case plainEncoding:
encoding = plainEncodingType
default:
encoding = jsonEncodingType
}
switch c.Mode { switch c.Mode {
case consoleMode: case consoleMode:
@@ -407,21 +424,31 @@ func infoTextSync(msg string) {
} }
func outputAny(writer io.Writer, level string, val interface{}) { func outputAny(writer io.Writer, level string, val interface{}) {
info := logEntry{ switch encoding {
Timestamp: getTimestamp(), case plainEncodingType:
Level: level, writePlainAny(writer, level, val)
Content: val, default:
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: val,
}
outputJson(writer, info)
} }
outputJson(writer, info)
} }
func outputText(writer io.Writer, level, msg string) { func outputText(writer io.Writer, level, msg string) {
info := logEntry{ switch encoding {
Timestamp: getTimestamp(), case plainEncodingType:
Level: level, writePlainText(writer, level, msg)
Content: msg, default:
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: msg,
}
outputJson(writer, info)
} }
outputJson(writer, info)
} }
func outputError(writer io.Writer, msg string, callDepth int) { func outputError(writer io.Writer, msg string, callDepth int) {
@@ -565,6 +592,62 @@ func statSync(msg string) {
} }
} }
func writePlainAny(writer io.Writer, level string, val interface{}, fields ...string) {
switch v := val.(type) {
case string:
writePlainText(writer, level, v, fields...)
case error:
writePlainText(writer, level, v.Error(), fields...)
case fmt.Stringer:
writePlainText(writer, level, v.String(), fields...)
default:
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
if err := json.NewEncoder(&buf).Encode(val); err != nil {
log.Println(err.Error())
return
}
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
}
func writePlainText(writer io.Writer, level, msg string, fields ...string) {
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
buf.WriteString(msg)
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
type logWriter struct { type logWriter struct {
logger *log.Logger logger *log.Logger
} }

View File

@@ -141,6 +141,78 @@ func TestStructedLogInfov(t *testing.T) {
}) })
} }
func TestStructedLogInfoConsoleAny(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := encoding
encoding = plainEncodingType
defer func() {
encoding = old
}()
Infov(v)
})
}
func TestStructedLogInfoConsoleAnyString(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := encoding
encoding = plainEncodingType
defer func() {
encoding = old
}()
Infov(fmt.Sprint(v...))
})
}
func TestStructedLogInfoConsoleAnyError(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := encoding
encoding = plainEncodingType
defer func() {
encoding = old
}()
Infov(errors.New(fmt.Sprint(v...)))
})
}
func TestStructedLogInfoConsoleAnyStringer(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := encoding
encoding = plainEncodingType
defer func() {
encoding = old
}()
Infov(ValStringer{
val: fmt.Sprint(v...),
})
})
}
func TestStructedLogInfoConsoleText(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := encoding
encoding = plainEncodingType
defer func() {
encoding = old
}()
Info(fmt.Sprint(v...))
})
}
func TestStructedLogSlow(t *testing.T) { func TestStructedLogSlow(t *testing.T) {
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) { doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
slowLog = writer slowLog = writer
@@ -432,6 +504,17 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
assert.True(t, strings.Contains(val, message)) assert.True(t, strings.Contains(val, message))
} }
func doTestStructedLogConsole(t *testing.T, setup func(writer io.WriteCloser),
write func(...interface{})) {
const message = "hello there"
writer := new(mockWriter)
setup(writer)
atomic.StoreUint32(&initialized, 1)
write(message)
println(writer.String())
assert.True(t, strings.Contains(writer.String(), message))
}
func testSetLevelTwiceWithMode(t *testing.T, mode string) { func testSetLevelTwiceWithMode(t *testing.T, mode string) {
SetUp(LogConf{ SetUp(LogConf{
Mode: mode, Mode: mode,
@@ -456,3 +539,11 @@ func testSetLevelTwiceWithMode(t *testing.T, mode string) {
ErrorStackf(message) ErrorStackf(message)
assert.Equal(t, 0, writer.builder.Len()) assert.Equal(t, 0, writer.builder.Len())
} }
type ValStringer struct {
val string
}
func (v ValStringer) String() string {
return v.val
}

View File

@@ -13,9 +13,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const ( const (

View File

@@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs" "github.com/zeromicro/go-zero/core/fs"
) )
func TestDailyRotateRuleMarkRotated(t *testing.T) { func TestDailyRotateRuleMarkRotated(t *testing.T) {

View File

@@ -6,7 +6,7 @@ import (
"io" "io"
"time" "time"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
@@ -77,16 +77,24 @@ func (l *traceLogger) WithDuration(duration time.Duration) Logger {
} }
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) { func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
outputJson(writer, &traceLogger{ traceID := traceIdFromContext(l.ctx)
logEntry: logEntry{ spanID := spanIdFromContext(l.ctx)
Timestamp: getTimestamp(),
Level: level, switch encoding {
Duration: l.Duration, case plainEncodingType:
Content: val, writePlainAny(writer, level, val, l.Duration, traceID, spanID)
}, default:
Trace: traceIdFromContext(l.ctx), outputJson(writer, &traceLogger{
Span: spanIdFromContext(l.ctx), logEntry: logEntry{
}) Timestamp: getTimestamp(),
Level: level,
Duration: l.Duration,
Content: val,
},
Trace: traceID,
Span: spanID,
})
}
} }
// WithContext sets ctx to log, for keeping tracing information. // WithContext sets ctx to log, for keeping tracing information.

View File

@@ -82,6 +82,37 @@ func TestTraceInfo(t *testing.T) {
assert.True(t, strings.Contains(buf.String(), spanKey)) assert.True(t, strings.Contains(buf.String(), spanKey))
} }
func TestTraceInfoConsole(t *testing.T) {
old := encoding
encoding = plainEncodingType
defer func() {
encoding = old
}()
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
infoLog = newLogWriter(log.New(&buf, "", flags))
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infov(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
}
func TestTraceSlow(t *testing.T) { func TestTraceSlow(t *testing.T) {
var buf mockWriter var buf mockWriter
atomic.StoreUint32(&initialized, 1) atomic.StoreUint32(&initialized, 1)

View File

@@ -3,7 +3,7 @@ package mapping
import ( import (
"io" "io"
"github.com/tal-tech/go-zero/core/jsonx" "github.com/zeromicro/go-zero/core/jsonx"
) )
const jsonTagKey = "json" const jsonTagKey = "json"

View File

@@ -9,9 +9,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/tal-tech/go-zero/core/jsonx" "github.com/zeromicro/go-zero/core/jsonx"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
const ( const (
@@ -742,7 +742,9 @@ func getValueWithChainedKeys(m Valuer, keys []string) (interface{}, bool) {
if len(keys) == 1 { if len(keys) == 1 {
v, ok := m.Value(keys[0]) v, ok := m.Value(keys[0])
return v, ok return v, ok
} else if len(keys) > 1 { }
if len(keys) > 1 {
if v, ok := m.Value(keys[0]); ok { if v, ok := m.Value(keys[0]); ok {
if nextm, ok := v.(map[string]interface{}); ok { if nextm, ok := v.(map[string]interface{}); ok {
return getValueWithChainedKeys(MapValuer(nextm), keys[1:]) return getValueWithChainedKeys(MapValuer(nextm), keys[1:])

View File

@@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
// because json.Number doesn't support strconv.ParseUint(...), // because json.Number doesn't support strconv.ParseUint(...),

View File

@@ -10,7 +10,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
const ( const (

View File

@@ -4,7 +4,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx" "github.com/zeromicro/go-zero/core/stringx"
) )
func TestMaxInt(t *testing.T) { func TestMaxInt(t *testing.T) {

View File

@@ -2,7 +2,7 @@ package metric
import ( import (
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"github.com/tal-tech/go-zero/core/proc" "github.com/zeromicro/go-zero/core/proc"
) )
type ( type (

View File

@@ -2,7 +2,7 @@ package metric
import ( import (
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"github.com/tal-tech/go-zero/core/proc" "github.com/zeromicro/go-zero/core/proc"
) )
type ( type (

View File

@@ -2,7 +2,7 @@ package metric
import ( import (
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"github.com/tal-tech/go-zero/core/proc" "github.com/zeromicro/go-zero/core/proc"
) )
type ( type (

View File

@@ -3,12 +3,11 @@ package mr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"sync" "sync"
"sync/atomic"
"github.com/tal-tech/go-zero/core/errorx" "github.com/zeromicro/go-zero/core/errorx"
"github.com/tal-tech/go-zero/core/lang" "github.com/zeromicro/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading"
) )
const ( const (
@@ -24,12 +23,12 @@ var (
) )
type ( type (
// ForEachFunc is used to do element processing, but no output.
ForEachFunc func(item interface{})
// GenerateFunc is used to let callers send elements into source. // GenerateFunc is used to let callers send elements into source.
GenerateFunc func(source chan<- interface{}) GenerateFunc func(source chan<- interface{})
// MapFunc is used to do element processing and write the output to writer. // MapFunc is used to do element processing and write the output to writer.
MapFunc func(item interface{}, writer Writer) MapFunc func(item interface{}, writer Writer)
// VoidMapFunc is used to do element processing, but no output.
VoidMapFunc func(item interface{})
// MapperFunc is used to do element processing and write the output to writer, // MapperFunc is used to do element processing and write the output to writer,
// use cancel func to cancel the processing. // use cancel func to cancel the processing.
MapperFunc func(item interface{}, writer Writer, cancel func(error)) MapperFunc func(item interface{}, writer Writer, cancel func(error))
@@ -42,6 +41,16 @@ type (
// Option defines the method to customize the mapreduce. // Option defines the method to customize the mapreduce.
Option func(opts *mapReduceOptions) Option func(opts *mapReduceOptions)
mapperContext struct {
ctx context.Context
mapper MapFunc
source <-chan interface{}
panicChan *onceChan
collector chan<- interface{}
doneChan <-chan lang.PlaceholderType
workers int
}
mapReduceOptions struct { mapReduceOptions struct {
ctx context.Context ctx context.Context
workers int workers int
@@ -69,7 +78,6 @@ func Finish(fns ...func() error) error {
cancel(err) cancel(err)
} }
}, func(pipe <-chan interface{}, cancel func(error)) { }, func(pipe <-chan interface{}, cancel func(error)) {
drain(pipe)
}, WithWorkers(len(fns))) }, WithWorkers(len(fns)))
} }
@@ -79,7 +87,7 @@ func FinishVoid(fns ...func()) {
return return
} }
MapVoid(func(source chan<- interface{}) { ForEach(func(source chan<- interface{}) {
for _, fn := range fns { for _, fn := range fns {
source <- fn source <- fn
} }
@@ -89,41 +97,74 @@ func FinishVoid(fns ...func()) {
}, WithWorkers(len(fns))) }, WithWorkers(len(fns)))
} }
// Map maps all elements generated from given generate func, and returns an output channel. // ForEach maps all elements from given generate but no output.
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} { func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
options := buildOptions(opts...) options := buildOptions(opts...)
source := buildSource(generate) panicChan := &onceChan{channel: make(chan interface{})}
source := buildSource(generate, panicChan)
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
done := make(chan lang.PlaceholderType) done := make(chan lang.PlaceholderType)
go executeMappers(options.ctx, mapper, source, collector, done, options.workers) go executeMappers(mapperContext{
ctx: options.ctx,
mapper: func(item interface{}, writer Writer) {
mapper(item)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
return collector for {
select {
case v := <-panicChan.channel:
panic(v)
case _, ok := <-collector:
if !ok {
return
}
}
}
} }
// MapReduce maps all elements generated from given generate func, // MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer. // and reduces the output elements with given reducer.
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) { opts ...Option) (interface{}, error) {
source := buildSource(generate) panicChan := &onceChan{channel: make(chan interface{})}
return MapReduceWithSource(source, mapper, reducer, opts...) source := buildSource(generate, panicChan)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
} }
// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer. // MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) { opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
reducer ReducerFunc, opts ...Option) (interface{}, error) {
options := buildOptions(opts...) options := buildOptions(opts...)
// output is used to write the final result
output := make(chan interface{}) output := make(chan interface{})
defer func() { defer func() {
// reducer can only write once, if more, panic
for range output { for range output {
panic("more than one element written in reducer") panic("more than one element written in reducer")
} }
}() }()
// collector is used to collect data from mapper, and consume in reducer
collector := make(chan interface{}, options.workers) collector := make(chan interface{}, options.workers)
// if done is closed, all mappers and reducer should stop processing
done := make(chan lang.PlaceholderType) done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done) writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once var closeOnce sync.Once
// use atomic.Value to avoid data race
var retErr errorx.AtomicError var retErr errorx.AtomicError
finish := func() { finish := func() {
closeOnce.Do(func() { closeOnce.Do(func() {
@@ -145,28 +186,41 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
go func() { go func() {
defer func() { defer func() {
drain(collector) drain(collector)
if r := recover(); r != nil { if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r)) panicChan.write(r)
} else {
finish()
} }
finish()
}() }()
reducer(collector, writer, cancel) reducer(collector, writer, cancel)
}() }()
go executeMappers(options.ctx, func(item interface{}, w Writer) { go executeMappers(mapperContext{
mapper(item, w, cancel) ctx: options.ctx,
}, source, collector, done, options.workers) mapper: func(item interface{}, w Writer) {
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
value, ok := <-output select {
if err := retErr.Load(); err != nil { case <-options.ctx.Done():
return nil, err cancel(context.DeadlineExceeded)
} else if ok { return nil, context.DeadlineExceeded
return value, nil case v := <-panicChan.channel:
} else { panic(v)
return nil, ErrReduceNoOutput case v, ok := <-output:
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return v, nil
} else {
return nil, ErrReduceNoOutput
}
} }
} }
@@ -175,18 +229,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { _, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel) reducer(input, cancel)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...) }, opts...)
return err if errors.Is(err, ErrReduceNoOutput) {
} return nil
}
// MapVoid maps all elements from given generate but no output. return err
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
drain(Map(generate, func(item interface{}, writer Writer) {
mapper(item)
}, opts...))
} }
// WithContext customizes a mapreduce processing accepts a given ctx. // WithContext customizes a mapreduce processing accepts a given ctx.
@@ -216,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
return options return options
} }
func buildSource(generate GenerateFunc) chan interface{} { func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
source := make(chan interface{}) source := make(chan interface{})
threading.GoSafe(func() { go func() {
defer close(source) defer func() {
if r := recover(); r != nil {
panicChan.write(r)
}
close(source)
}()
generate(source) generate(source)
}) }()
return source return source
} }
@@ -233,39 +287,43 @@ func drain(channel <-chan interface{}) {
} }
} }
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{}, func executeMappers(mCtx mapperContext) {
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
var wg sync.WaitGroup var wg sync.WaitGroup
defer func() { defer func() {
wg.Wait() wg.Wait()
close(collector) close(mCtx.collector)
drain(mCtx.source)
}() }()
pool := make(chan lang.PlaceholderType, workers) var failed int32
writer := newGuardedWriter(ctx, collector, done) pool := make(chan lang.PlaceholderType, mCtx.workers)
for { writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
for atomic.LoadInt32(&failed) == 0 {
select { select {
case <-ctx.Done(): case <-mCtx.ctx.Done():
return return
case <-done: case <-mCtx.doneChan:
return return
case pool <- lang.Placeholder: case pool <- lang.Placeholder:
item, ok := <-input item, ok := <-mCtx.source
if !ok { if !ok {
<-pool <-pool
return return
} }
wg.Add(1) wg.Add(1)
// better to safely run caller defined method go func() {
threading.GoSafe(func() {
defer func() { defer func() {
if r := recover(); r != nil {
atomic.AddInt32(&failed, 1)
mCtx.panicChan.write(r)
}
wg.Done() wg.Done()
<-pool <-pool
}() }()
mapper(item, writer) mCtx.mapper(item, writer)
}) }()
} }
} }
} }
@@ -311,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
gw.channel <- v gw.channel <- v
} }
} }
type onceChan struct {
channel chan interface{}
wrote int32
}
func (oc *onceChan) write(val interface{}) {
if atomic.AddInt32(&oc.wrote, 1) > 1 {
return
}
oc.channel <- val
}

View File

@@ -0,0 +1,78 @@
//go:build go1.18
// +build go1.18
package mr
import (
"fmt"
"math/rand"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func FuzzMapReduce(f *testing.F) {
rand.Seed(time.Now().UnixNano())
f.Add(uint(10), uint(runtime.NumCPU()))
f.Fuzz(func(t *testing.T, num uint, workers uint) {
n := int64(num)%5000 + 5000
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
})
}

View File

@@ -0,0 +1,107 @@
//go:build fuzz
// +build fuzz
package mr
import (
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
// so we need to simulate the fuzz test in test mode.
func TestMapReduceRandom(t *testing.T) {
rand.Seed(time.Now().UnixNano())
const (
times = 10000
nRange = 500
mega = 1024 * 1024
)
bar := pb.New(times).Start()
runner := threading.NewTaskRunner(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
runner.Schedule(func() {
start := time.Now()
defer func() {
if time.Since(start) > time.Minute {
t.Fatal("timeout")
}
wg.Done()
}()
t.Run(strconv.Itoa(i), func(t *testing.T) {
n := rand.Int63n(nRange)%nRange + nRange
workers := rand.Int()%50 + runtime.NumCPU()/2
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
bar.Increment()
})
})
}
wg.Wait()
bar.Finish()
}

View File

@@ -11,8 +11,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx" "go.uber.org/goleak"
"github.com/tal-tech/go-zero/core/syncx"
) )
var errDummy = errors.New("dummy") var errDummy = errors.New("dummy")
@@ -22,6 +21,8 @@ func init() {
} }
func TestFinish(t *testing.T) { func TestFinish(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32 var total uint32
err := Finish(func() error { err := Finish(func() error {
atomic.AddUint32(&total, 2) atomic.AddUint32(&total, 2)
@@ -39,14 +40,20 @@ func TestFinish(t *testing.T) {
} }
func TestFinishNone(t *testing.T) { func TestFinishNone(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Nil(t, Finish()) assert.Nil(t, Finish())
} }
func TestFinishVoidNone(t *testing.T) { func TestFinishVoidNone(t *testing.T) {
defer goleak.VerifyNone(t)
FinishVoid() FinishVoid()
} }
func TestFinishErr(t *testing.T) { func TestFinishErr(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32 var total uint32
err := Finish(func() error { err := Finish(func() error {
atomic.AddUint32(&total, 2) atomic.AddUint32(&total, 2)
@@ -63,6 +70,8 @@ func TestFinishErr(t *testing.T) {
} }
func TestFinishVoid(t *testing.T) { func TestFinishVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32 var total uint32
FinishVoid(func() { FinishVoid(func() {
atomic.AddUint32(&total, 2) atomic.AddUint32(&total, 2)
@@ -75,70 +84,107 @@ func TestFinishVoid(t *testing.T) {
assert.Equal(t, uint32(10), atomic.LoadUint32(&total)) assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
} }
func TestMap(t *testing.T) { func TestForEach(t *testing.T) {
tests := []struct { const tasks = 1000
mapper MapFunc
expect int
}{
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
writer.Write(v * v)
},
expect: 30,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
return
}
writer.Write(v * v)
},
expect: 10,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
panic(v)
}
writer.Write(v * v)
},
expect: 10,
},
}
for _, test := range tests { t.Run("all", func(t *testing.T) {
t.Run(stringx.Rand(), func(t *testing.T) { defer goleak.VerifyNone(t)
channel := Map(func(source chan<- interface{}) {
for i := 1; i < 5; i++ { var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
}, WithWorkers(-1))
assert.Equal(t, tasks, int(count))
})
t.Run("odd", func(t *testing.T) {
defer goleak.VerifyNone(t)
var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
if item.(int)%2 == 0 {
atomic.AddUint32(&count, 1)
}
})
assert.Equal(t, tasks/2, int(count))
})
t.Run("all", func(t *testing.T) {
defer goleak.VerifyNone(t)
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i source <- i
} }
}, test.mapper, WithWorkers(-1)) }, func(item interface{}) {
panic("foo")
var result int })
for v := range channel {
result += v.(int)
}
assert.Equal(t, test.expect, result)
}) })
} })
}
func TestGeneratePanic(t *testing.T) {
defer goleak.VerifyNone(t)
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
panic("foo")
}, func(item interface{}) {
})
})
})
}
func TestMapperPanic(t *testing.T) {
defer goleak.VerifyNone(t)
const tasks = 1000
var run int32
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
atomic.AddInt32(&run, 1)
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
})
})
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
})
} }
func TestMapReduce(t *testing.T) { func TestMapReduce(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct { tests := []struct {
name string
mapper MapperFunc mapper MapperFunc
reducer ReducerFunc reducer ReducerFunc
expectErr error expectErr error
expectValue interface{} expectValue interface{}
}{ }{
{ {
name: "simple",
expectErr: nil, expectErr: nil,
expectValue: 30, expectValue: 30,
}, },
{ {
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -149,6 +195,7 @@ func TestMapReduce(t *testing.T) {
expectErr: errDummy, expectErr: errDummy,
}, },
{ {
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -160,6 +207,7 @@ func TestMapReduce(t *testing.T) {
expectValue: nil, expectValue: nil,
}, },
{ {
name: "cancel with more",
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) { reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int var result int
for item := range pipe { for item := range pipe {
@@ -174,36 +222,74 @@ func TestMapReduce(t *testing.T) {
}, },
} }
for _, test := range tests { t.Run("MapReduce", func(t *testing.T) {
t.Run(stringx.Rand(), func(t *testing.T) { for _, test := range tests {
if test.mapper == nil { t.Run(test.name, func(t *testing.T) {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) { if test.mapper == nil {
v := item.(int) test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
writer.Write(v * v) v := item.(int)
} writer.Write(v * v)
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
} }
writer.Write(result)
} }
} if test.reducer == nil {
value, err := MapReduce(func(source chan<- interface{}) { test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for i := 1; i < 5; i++ { var result int
source <- i for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
} }
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU())) value, err := MapReduce(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
assert.Equal(t, test.expectErr, err) assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value) assert.Equal(t, test.expectValue, value)
}) })
} }
})
t.Run("MapReduce", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
}
source := make(chan interface{})
go func() {
for i := 1; i < 5; i++ {
source <- i
}
close(source)
}()
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
})
} }
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) { func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() { assert.Panics(t, func() {
MapReduce(func(source chan<- interface{}) { MapReduce(func(source chan<- interface{}) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@@ -220,18 +306,23 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
} }
func TestMapReduceVoid(t *testing.T) { func TestMapReduceVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var value uint32 var value uint32
tests := []struct { tests := []struct {
name string
mapper MapperFunc mapper MapperFunc
reducer VoidReducerFunc reducer VoidReducerFunc
expectValue uint32 expectValue uint32
expectErr error expectErr error
}{ }{
{ {
name: "simple",
expectValue: 30, expectValue: 30,
expectErr: nil, expectErr: nil,
}, },
{ {
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -242,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: errDummy, expectErr: errDummy,
}, },
{ {
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) { mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int) v := item.(int)
if v%3 == 0 { if v%3 == 0 {
@@ -252,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: ErrCancelWithNil, expectErr: ErrCancelWithNil,
}, },
{ {
name: "cancel with more",
reducer: func(pipe <-chan interface{}, cancel func(error)) { reducer: func(pipe <-chan interface{}, cancel func(error)) {
for item := range pipe { for item := range pipe {
result := atomic.AddUint32(&value, uint32(item.(int))) result := atomic.AddUint32(&value, uint32(item.(int)))
@@ -265,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
atomic.StoreUint32(&value, 0) atomic.StoreUint32(&value, 0)
if test.mapper == nil { if test.mapper == nil {
@@ -296,6 +389,8 @@ func TestMapReduceVoid(t *testing.T) {
} }
func TestMapReduceVoidWithDelay(t *testing.T) { func TestMapReduceVoidWithDelay(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int var result []int
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0 source <- 0
@@ -318,38 +413,64 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
assert.Equal(t, 0, result[1]) assert.Equal(t, 0, result[1])
} }
func TestMapVoid(t *testing.T) { func TestMapReducePanic(t *testing.T) {
const tasks = 1000 defer goleak.VerifyNone(t)
var count uint32
MapVoid(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
})
assert.Equal(t, tasks, int(count)) assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("panic")
}
})
})
} }
func TestMapReducePanic(t *testing.T) { func TestMapReducePanicOnce(t *testing.T) {
v, err := MapReduce(func(source chan<- interface{}) { defer goleak.VerifyNone(t)
source <- 0
source <- 1 assert.Panics(t, func() {
}, func(item interface{}, writer Writer, cancel func(error)) { _, _ = MapReduce(func(source chan<- interface{}) {
i := item.(int) for i := 0; i < 100; i++ {
writer.Write(i) source <- i
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) { }
for range pipe { }, func(item interface{}, writer Writer, cancel func(error)) {
panic("panic") i := item.(int)
} if i == 0 {
panic("foo")
}
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("bar")
}
})
})
}
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
panic("bar")
})
}) })
assert.Nil(t, v)
assert.NotNil(t, err)
assert.Equal(t, "panic", err.Error())
} }
func TestMapReduceVoidCancel(t *testing.T) { func TestMapReduceVoidCancel(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int var result []int
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0 source <- 0
@@ -371,13 +492,15 @@ func TestMapReduceVoidCancel(t *testing.T) {
} }
func TestMapReduceVoidCancelWithRemains(t *testing.T) { func TestMapReduceVoidCancelWithRemains(t *testing.T) {
var done syncx.AtomicBool defer goleak.VerifyNone(t)
var done int32
var result []int var result []int
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
done.Set(true) atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) { }, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int) i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
@@ -392,10 +515,12 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
}) })
assert.NotNil(t, err) assert.NotNil(t, err)
assert.Equal(t, "anything", err.Error()) assert.Equal(t, "anything", err.Error())
assert.True(t, done.True()) assert.Equal(t, int32(1), done)
} }
func TestMapReduceWithoutReducerWrite(t *testing.T) { func TestMapReduceWithoutReducerWrite(t *testing.T) {
defer goleak.VerifyNone(t)
uids := []int{1, 2, 3} uids := []int{1, 2, 3}
res, err := MapReduce(func(source chan<- interface{}) { res, err := MapReduce(func(source chan<- interface{}) {
for _, uid := range uids { for _, uid := range uids {
@@ -412,33 +537,54 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
} }
func TestMapReduceVoidPanicInReducer(t *testing.T) { func TestMapReduceVoidPanicInReducer(t *testing.T) {
defer goleak.VerifyNone(t)
const message = "foo" const message = "foo"
var done syncx.AtomicBool assert.Panics(t, func() {
err := MapReduceVoid(func(source chan<- interface{}) { var done int32
_ = MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, cancel func(error)) {
panic(message)
}, WithWorkers(1))
})
}
func TestForEachWithContext(t *testing.T) {
defer goleak.VerifyNone(t)
var done int32
ctx, cancel := context.WithCancel(context.Background())
ForEach(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
done.Set(true) atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) { }, func(item interface{}) {
i := item.(int) i := item.(int)
writer.Write(i) if i == defaultWorkers/2 {
}, func(pipe <-chan interface{}, cancel func(error)) { cancel()
panic(message) }
}, WithWorkers(1)) }, WithContext(ctx))
assert.NotNil(t, err)
assert.Equal(t, message, err.Error())
assert.True(t, done.True())
} }
func TestMapReduceWithContext(t *testing.T) { func TestMapReduceWithContext(t *testing.T) {
var done syncx.AtomicBool defer goleak.VerifyNone(t)
var done int32
var result []int var result []int
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
err := MapReduceVoid(func(source chan<- interface{}) { err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ { for i := 0; i < defaultWorkers*2; i++ {
source <- i source <- i
} }
done.Set(true) atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, c func(error)) { }, func(item interface{}, writer Writer, c func(error)) {
i := item.(int) i := item.(int)
if i == defaultWorkers/2 { if i == defaultWorkers/2 {
@@ -452,7 +598,7 @@ func TestMapReduceWithContext(t *testing.T) {
} }
}, WithContext(ctx)) }, WithContext(ctx))
assert.NotNil(t, err) assert.NotNil(t, err)
assert.Equal(t, ErrReduceNoOutput, err) assert.Equal(t, context.DeadlineExceeded, err)
} }
func BenchmarkMapReduce(b *testing.B) { func BenchmarkMapReduce(b *testing.B) {

View File

@@ -11,7 +11,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
const ( const (

View File

@@ -15,7 +15,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
// DefaultMemProfileRate is the default memory profiling rate. // DefaultMemProfileRate is the default memory profiling rate.

View File

@@ -10,8 +10,8 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
) )
const ( const (

View File

@@ -8,7 +8,7 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
const timeFormat = "0102150405" const timeFormat = "0102150405"

View File

@@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/olekukonko/tablewriter" "github.com/olekukonko/tablewriter"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
) )
type ( type (

View File

@@ -1,6 +1,6 @@
package prof package prof
import "github.com/tal-tech/go-zero/core/utils" import "github.com/zeromicro/go-zero/core/utils"
type ( type (
// A ProfilePoint is a profile time point. // A ProfilePoint is a profile time point.

View File

@@ -3,7 +3,7 @@ package prof
import ( import (
"testing" "testing"
"github.com/tal-tech/go-zero/core/utils" "github.com/zeromicro/go-zero/core/utils"
) )
func TestProfiler(t *testing.T) { func TestProfiler(t *testing.T) {

30
core/prof/runtime.go Normal file
View File

@@ -0,0 +1,30 @@
package prof
import (
"fmt"
"runtime"
"time"
)
const (
defaultInterval = time.Second * 5
mega = 1024 * 1024
)
func DisplayStats(interval ...time.Duration) {
duration := defaultInterval
for _, val := range interval {
duration = val
}
go func() {
ticker := time.NewTicker(duration)
defer ticker.Stop()
for range ticker.C {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Goroutines: %d, Alloc: %vm, TotalAlloc: %vm, Sys: %vm, NumGC: %v\n",
runtime.NumGoroutine(), m.Alloc/mega, m.TotalAlloc/mega, m.Sys/mega, m.NumGC)
}
}()
}

View File

@@ -6,9 +6,9 @@ import (
"sync" "sync"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
) )
var ( var (

View File

@@ -4,7 +4,7 @@ import (
"errors" "errors"
"sync/atomic" "sync/atomic"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
// ErrNoAvailablePusher indicates no pusher available. // ErrNoAvailablePusher indicates no pusher available.

View File

@@ -1,6 +1,6 @@
package queue package queue
import "github.com/tal-tech/go-zero/core/errorx" import "github.com/zeromicro/go-zero/core/errorx"
// A MultiPusher is a pusher that can push messages to multiple underlying pushers. // A MultiPusher is a pusher that can push messages to multiple underlying pushers.
type MultiPusher struct { type MultiPusher struct {

View File

@@ -6,11 +6,11 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/rescue" "github.com/zeromicro/go-zero/core/rescue"
"github.com/tal-tech/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/threading" "github.com/zeromicro/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
const queueName = "queue" const queueName = "queue"

View File

@@ -6,8 +6,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
) )
var ( var (

View File

@@ -1,6 +1,6 @@
package rescue package rescue
import "github.com/tal-tech/go-zero/core/logx" import "github.com/zeromicro/go-zero/core/logx"
// Recover is used with defer to do cleanup on panics. // Recover is used with defer to do cleanup on panics.
// Use it like: // Use it like:

View File

@@ -5,7 +5,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
func init() { func init() {

Some files were not shown because too many files have changed in this diff Show More