Compare commits

..

103 Commits

Author SHA1 Message Date
Kevin Wan
e31128650e Revert "🐞 fix(gen): pg gen of insert (#1591)" (#1598)
This reverts commit cc4c4928e0.
2022-03-01 20:27:59 +08:00
Kevin Wan
168740b64d chore: upgrade etcd (#1597) 2022-03-01 20:16:44 +08:00
toutou_o
cc4c4928e0 🐞 fix(gen): pg gen of insert (#1591)
Co-authored-by: kurimi1 <d0n41df@gmail.com>
2022-03-01 19:53:23 +08:00
Fyn
fba6543b23 fix: goctl api dart support form tag (#1596) 2022-03-01 16:17:37 +08:00
Kevin Wan
877eb6ac56 Update readme.md
add producthunt.
2022-03-01 16:11:17 +08:00
Kevin Wan
259a5a13e7 chore: fix data race (#1593) 2022-02-28 23:17:51 +08:00
Fyn
cf7c7cb392 build: update goctl dependency ddl-parser to v1.0.3 (#1586)
* build: update goctl dependency ddl-parser to v1.0.3

* fix: race condition when testing logx

Resolves: #1587
2022-02-28 17:31:59 +08:00
ccx
86d01e2e99 test: add testcase for FIFO Queue in collection module (#1589)
cover the case of non-zero value for q.Header when q.Elements expands
2022-02-28 17:15:11 +08:00
Kevin Wan
7a28e19a27 Update readme-cn.md 2022-02-27 23:30:14 +08:00
Kevin Wan
900ea63d68 Update readme-cn.md
add migration notice.
2022-02-27 23:29:45 +08:00
Kevin Wan
87ab86cdd0 Update readme.md 2022-02-27 23:26:03 +08:00
Kevin Wan
0697494ffd Update readme.md
Add migrate steps.
2022-02-27 23:25:22 +08:00
anqiansong
ffd69a2f5e Fix bug int overflow while build goctl on arch 386 (#1582)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-02-27 10:51:57 +08:00
Kevin Wan
66f10bb5e6 chore: add goctl command help (#1578) 2022-02-26 17:02:04 +08:00
Kevin Wan
8131a0e777 Update readme.md
update chat link.
2022-02-25 23:02:09 +08:00
Kevin Wan
32a557dff6 Update readme.md
add discord.
2022-02-25 23:01:15 +08:00
Fyn
db949e40f1 feat: supports importValue for more path formats (#1569)
`importValueRegex` now can match more path formats

Resolves: #1568
2022-02-25 11:16:57 +08:00
Kevin Wan
e0454138e0 update goctl to go 1.16 for io/fs usage (#1571)
* update goctl to go 1.16 for io/fs usage

* feat: support pg serial type for auto_increment (#1563)

* add correct example for pg's url

* 🐞 fix: merge

* 🐞 fix: pg default port

*  feat: support serial type

Co-authored-by: kurimi1 <d0n41df@gmail.com>

* chore: format code

Co-authored-by: toutou_o <33993460+kurimi1@users.noreply.github.com>
Co-authored-by: kurimi1 <d0n41df@gmail.com>
2022-02-24 13:58:53 +08:00
toutou_o
3b07ed1b97 feat: support pg serial type for auto_increment (#1563)
* add correct example for pg's url

* 🐞 fix: merge

* 🐞 fix: pg default port

*  feat: support serial type

Co-authored-by: kurimi1 <d0n41df@gmail.com>
2022-02-24 13:39:31 +08:00
anqiansong
daa98f5a27 Feature: Add goctl env (#1557) 2022-02-21 10:19:33 +08:00
Kevin Wan
842656aa90 feat: log 404 requests with traceid (#1554) 2022-02-19 20:50:33 +08:00
Kevin Wan
aa29036cb3 feat: support ctx in sql model generation (#1551) 2022-02-17 10:28:55 +08:00
Kevin Wan
607bae27fa feat: support ctx in sqlx/sqlc, listed in ROADMAP (#1535)
* feat: support ctx in sqlx/sqlc

* chore: update roadmap

* fix: context.Canceled should be acceptable

* use %w to wrap errors

* chore: remove unused vars
2022-02-16 19:31:43 +08:00
Kevin Wan
7c63676be4 docs: add go-zero users (#1546) 2022-02-16 12:02:52 +08:00
Kevin Wan
9e113909b3 ignore context.Canceled for redis breaker (#1545) 2022-02-15 21:31:30 +08:00
Kevin Wan
bd105474ca chore: update help message (#1544) 2022-02-15 21:19:40 +08:00
Mikael
a078f5d764 add the serviceAccount of deployment (#1543)
Co-authored-by: 977231903@qq.com <>
2022-02-15 20:57:14 +08:00
Kevin Wan
b215fa3ee6 fix #1541 (#1542) 2022-02-15 18:40:26 +08:00
mlr3000
50b1928502 chore:use struct pointer (#1538) 2022-02-15 11:34:48 +08:00
Kevin Wan
493e3bcf4b docs: update roadmap (#1537) 2022-02-15 08:37:03 +08:00
Kevin Wan
6deb80625d fix issue of default migrate version (#1536)
* fix issue of default migrate version

* chore: update console colors
2022-02-14 23:09:32 +08:00
Kevin Wan
6ab051568c Update readme-cn.md
add go-zero users
2022-02-14 16:57:48 +08:00
Kevin Wan
2732d3cdae chore: refactor cache (#1532) 2022-02-13 18:04:31 +08:00
chenquan
e8c307e4dc feat: support ctx in Cache (#1518)
* feature: support ctx in `Cache`

Signed-off-by: chenquan <chenquan.dev@foxmail.com>

* fix: `errors.Is` instead of `=`

Signed-off-by: chenquan <chenquan.dev@foxmail.com>
2022-02-13 17:28:14 +08:00
Kevin Wan
84ddc660c4 chore: goctl format issue (#1531) 2022-02-13 13:17:19 +08:00
Kevin Wan
e60e707955 upgrade grpc version (#1530) 2022-02-12 23:58:41 +08:00
Kevin Wan
cf4321b2d0 fix #1525 (#1527) 2022-02-11 23:04:57 +08:00
chenquan
1993faf2f8 fix: fix a typo (#1522)
Signed-off-by: chenquan <chenquan.dev@foxmail.com>
2022-02-11 21:15:45 +08:00
Kevin Wan
0ce85376bf chore: update goctl version to 1.3.2 (#1524) 2022-02-11 21:02:50 +08:00
Kevin Wan
a40254156f refactor: refactor yaml unmarshaler (#1517) 2022-02-09 17:22:52 +08:00
chenquan
05cc62f5ff chore: optimize yaml unmarshaler (#1513) 2022-02-09 16:57:00 +08:00
chenquan
9c2c90e533 chore: make error clearer (#1514) 2022-02-09 14:40:05 +08:00
Kevin Wan
822ee2e1c5 feat: update go-redis to v8, support ctx in redis methods (#1507)
* feat: update go-redis to v8, support ctx in redis methods

* fix compile errors

* chore: remove unused const

* chore: add tracing log on redis
2022-02-09 11:06:06 +08:00
anqiansong
77482c8946 fixes typo (#1511)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-02-08 22:16:38 +08:00
Kevin Wan
7ef0ab3119 Update readme-cn.md 2022-02-08 11:47:33 +08:00
anqiansong
8bd89a297a feature: Add goctl completion (#1505)
* feature: Add `goctl completion`

* Update const

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-02-08 10:50:21 +08:00
Kevin Wan
bb75cc796e test: change fuzz tests (#1504) 2022-02-05 09:44:01 +08:00
Kevin Wan
0fdd8f54eb ci: add test for win (#1503)
* ci: add test for win

* ci: update check names

* ci: use go build instead of go test to verify win test

* fix: windows test failure

* chore: disable logs in tests
2022-02-05 00:06:23 +08:00
anqiansong
b1ffc464cd fix typo: goctl protoc usage (#1502)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-02-03 22:13:02 +08:00
Kevin Wan
50174960e4 chore: update command comment (#1501) 2022-02-02 22:02:08 +08:00
Kevin Wan
8f46eab977 fix: goctl not compile on windows (#1500) 2022-02-01 13:58:08 +08:00
Kevin Wan
ec299085f5 docs: update tal-tech to zeromico in docs (#1498) 2022-02-01 13:03:30 +08:00
Kevin Wan
7727d70634 chore: update goctl version (#1497) 2022-02-01 09:50:26 +08:00
Kevin Wan
5f9d101bc6 feat: add runtime stats monitor (#1496) 2022-02-01 01:34:25 +08:00
Kevin Wan
6c2abe7474 fix: goroutine stuck on edge case (#1495)
* fix: goroutine stuck on edge case

* refactor: simplify mapreduce implementation
2022-01-30 13:09:21 +08:00
Kevin Wan
14a902c1a7 feat: handling panic in mapreduce, panic in calling goroutine, not inside goroutines (#1490)
* feat: handle panic

* chore: update fuzz test

* chore: optimize square sum algorithm
2022-01-28 10:59:41 +08:00
Kevin Wan
5ad6a6d229 Update readme-cn.md
add slogan
2022-01-27 17:16:30 +08:00
Kevin Wan
6f4b97864a chore: improve migrate confirmation (#1488) 2022-01-27 11:30:35 +08:00
Kevin Wan
0e0abc3a95 chore: update warning message (#1487) 2022-01-26 23:47:57 +08:00
anqiansong
696fda1db4 patch: goctl migrate (#1485)
* * Add signal check
* Add deprecated pkg check

* fix typo `replacementBuilderx`

* output to console if verbose

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-26 23:24:25 +08:00
Kevin Wan
c1d2634427 chore: update go version for goctl (#1484) 2022-01-26 14:27:43 +08:00
Kevin Wan
4b7a680ac5 refactor: rename from tal-tech to zeromicro for goctl (#1481) 2022-01-25 23:15:07 +08:00
Kevin Wan
b3e7d2901f Feature/trie ac automation (#1479)
* fix: trie ac automation issues

* fix: trie ac automation issues

* fix: trie ac automation issues

* fix: trie ac automation issues
2022-01-25 11:14:56 +08:00
anqiansong
cdf7ec213c fix #1468 (#1478)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-24 22:23:20 +08:00
Kevin Wan
f1102fb262 chore: optimize string search with Aho–Corasick algorithm (#1476)
* chore: optimize string search with Aho–Corasick algorithm

* chore: optimize keywords replacer

* fix: replacer bugs

* chore: reorder members
2022-01-23 23:37:02 +08:00
Keqi Huang
09d1fad6e0 Polish the words in readme.md (#1475) 2022-01-22 12:20:11 +08:00
Kevin Wan
379c65a3ef docs: add go-zero users (#1473) 2022-01-20 22:36:17 +08:00
Kevin Wan
fdc7f64d6f chore: update unauthorized callback calling order (#1469)
* chore: update unauthorized callback calling order

* chore: add comments
2022-01-20 21:09:45 +08:00
anqiansong
df0f8ed59e Fix/issue#1289 (#1460)
* fix #1289

* Add unit test case

* fix `jwtTransKey`

* fix `jwtTransKey`

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-18 11:52:30 +08:00
anqiansong
c903966fc7 patch: save missing templates to disk (#1463)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-18 10:45:05 +08:00
anqiansong
e57fa8ff53 Fix/issue#1447 (#1458)
* Add data for template to render

* fix #1447

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-18 10:36:38 +08:00
Kevin Wan
bf2feee5b7 feat: implement console plain output for debug logs (#1456)
* feat: implement console plain output for debug logs

* chore: rename console encoding to plain

* chore: refactor names
2022-01-17 12:43:15 +08:00
Letian Jiang
ce05c429fc chore: check interface satisfaction w/o allocating new variable (#1454) 2022-01-16 23:34:42 +08:00
Kevin Wan
272a3f347d chore: remove jwt deprecated (#1452) 2022-01-16 10:34:44 +08:00
shenbaise9527
13db7a1931 feat: 支持redis的LTrim方法 (#1443) 2022-01-16 10:27:34 +08:00
Kevin Wan
468c237189 chore: upgrade dependencies (#1444)
* chore: upgrade dependencies

* ci: upgrade go to 1.15
2022-01-14 11:01:02 +08:00
Kevin Wan
b9b80c068b ci: add translator action (#1441) 2022-01-12 17:57:39 +08:00
anqiansong
9b592b3dee Feature rpc protoc (#1251)
* code generation by protoc

* generate pb by protoc direct

* support: grpc code generation by protoc directly

* format code

* check --go_out & --go-grpc_out

* fix typo

* Update version

* fix typo

* optimize: remove deprecated unit test

* format code

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-11 20:34:25 +08:00
Kevin Wan
2203809e5e chore: fix typo (#1437) 2022-01-11 20:23:59 +08:00
Kevin Wan
8d6d37f71e remove unnecessary drain, fix data race (#1435)
* remove unnecessary drain, fix data race

* chore: fix parameter order

* refactor: rename MapVoid to ForEach in mr
2022-01-11 16:17:51 +08:00
Kevin Wan
ea4f2af67f fix: mr goroutine leak on context deadline (#1433)
* fix: mr goroutine leak on context deadline

* test: update fx test check
2022-01-10 22:06:10 +08:00
Kevin Wan
53af194ef9 chore: refactor periodlimit (#1428)
* chore: refactor periodlimit

* chore: add comments
2022-01-09 16:22:34 +08:00
Kevin Wan
5e0e2d2b14 docs: add go-zero users (#1425) 2022-01-08 21:41:27 +08:00
Kevin Wan
74c99184c5 docs: add go-zero users (#1424) 2022-01-08 17:08:44 +08:00
Kevin Wan
eb4b86137a fix: golint issue (#1423) 2022-01-08 16:06:56 +08:00
Kevin Wan
9c4f4f3b4e update docs (#1421) 2022-01-07 12:08:45 +08:00
spectatorMrZ
240132e7c7 Fix pg model generation without tag (#1407)
1. fix pg model struct haven't tag
2. add pg command test from datasource
2022-01-07 10:45:26 +08:00
anqiansong
9d67fc4cfb feat: Add migrate (#1419)
* Add migrate

* Remove unused module

* refactor filename

* rename refactor to migrate

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-06 18:48:34 +08:00
Kevin Wan
892f93a716 docs: update install readme (#1417) 2022-01-05 12:31:49 +08:00
Kevin Wan
ba6a7c9dc8 chore: refactor rest/timeouthandler (#1415) 2022-01-05 11:17:10 +08:00
Kevin Wan
a91c3907a8 feat: rename module from tal-tech to zeromicro (#1413) 2022-01-04 15:51:32 +08:00
Kevin Wan
e267d94ee1 chore: update go-zero to v1.2.5 (#1410) 2022-01-03 21:54:53 +08:00
anqiansong
89ce5e492b refactor file|path (#1409)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2022-01-03 21:32:40 +08:00
Kevin Wan
290de6aa96 docs: update roadmap (#1405) 2022-01-02 21:30:02 +08:00
Kevin Wan
a7aeb8ac0e feat: support tls for etcd client (#1390)
* feat: support tls for etcd client

* chore: fix typo

* refactor: rename TrustedCAFile to CACertFile

* docs: add comments

* fix: missing tls registration

* feat: add InsecureSkipVerify config for testing
2022-01-02 20:23:50 +08:00
Kevin Wan
a8e7fafebf refactor: optimize fx (#1404)
* refactor: optimize fx

* chore: add more comments

* ci: make test robust
2022-01-02 14:56:30 +08:00
Kevin Wan
7cc64070b1 docs: update goctl installation command (#1403) 2022-01-02 14:31:31 +08:00
Kevin Wan
c19d2637ea feat: implement fx.NoneMatch, fx.First, fx.Last (#1402)
* chore: use workers from options in fx.unlimitedWalk

* feat: add fx.NoneMatch

* feat: add fx.First, fx.Last

* chore: add more comments

* docs: add mr readme
2022-01-02 13:33:15 +08:00
Kevin Wan
fe1da14332 chore: simplify mapreduce (#1401) 2022-01-01 19:24:35 +08:00
anqiansong
8e9110cedf fix #1330 (#1382)
Co-authored-by: anqiansong <anqiansong@bytedance.com>
2021-12-30 20:44:04 +08:00
Kevin Wan
d6ff30a570 chore: fix golint issues (#1396) 2021-12-30 17:44:15 +08:00
Kevin Wan
b98d46bfd6 chore: update goctl version (#1394) 2021-12-30 15:30:16 +08:00
Kevin Wan
768936b256 ci: remove 386 binaries (#1393) 2021-12-30 15:18:24 +08:00
505 changed files with 8583 additions and 3088 deletions

View File

@@ -7,32 +7,50 @@ on:
branches: [ master ]
jobs:
build:
name: Build
test-linux:
name: Linux
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.15
id: go
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.14
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Get dependencies
run: |
go get -v -t -d ./...
- name: Lint
run: |
go vet -stdmethods=false $(go list ./...)
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
- name: Lint
run: |
go vet -stdmethods=false $(go list ./...)
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
- name: Test
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
- name: Test
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
- name: Codecov
uses: codecov/codecov-action@v2
- name: Codecov
uses: codecov/codecov-action@v2
test-win:
name: Windows
runs-on: windows-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.15
- name: Checkout codebase
uses: actions/checkout@v2
- name: Test
run: |
go mod verify
go mod download
go test -v -race ./...
cd tools/goctl && go build -v goctl.go

18
.github/workflows/issue-translator.yml vendored Normal file
View File

@@ -0,0 +1,18 @@
name: 'issue-translator'
on:
issue_comment:
types: [created]
issues:
types: [opened]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: tomsun28/issues-translate-action@v2.6
with:
IS_MODIFY_TITLE: true
# not require, default false, . Decide whether to modify the issue title
# if true, the robot account @Issues-translate-bot must have modification permissions, invite @Issues-translate-bot to your project or use your custom bot.
CUSTOM_BOT_NOTE: Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑‍🤝‍🧑👫🧑🏿‍🤝‍🧑🏻👩🏾‍🤝‍👨🏿👬🏿
# not require. Customize the translation robot prefix message.

View File

@@ -1,30 +0,0 @@
on:
release:
types: [created]
jobs:
releases-matrix:
name: Release goctl binary
runs-on: ubuntu-latest
strategy:
matrix:
# build and publish in parallel: linux/386, linux/amd64, linux/arm64,
# windows/386, windows/amd64, windows/arm64, darwin/amd64, darwin/arm64
goos: [linux, windows, darwin]
goarch: ["386", amd64, arm64]
exclude:
- goarch: "386"
goos: darwin
- goarch: "386"
goos: windows
steps:
- uses: actions/checkout@v2
- uses: wangyoucao577/go-release-action@v1.22
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
goos: ${{ matrix.goos }}
goarch: ${{ matrix.goarch }}
goversion: "https://dl.google.com/go/go1.17.5.linux-amd64.tar.gz"
project_path: "tools/goctl"
binary_name: "goctl"
extra_files: tools/goctl/goctl.md

3
.gitignore vendored
View File

@@ -16,7 +16,8 @@
**/logs
# for test purpose
adhoc
**/adhoc
**/testdata
# gitlab ci
.cache

View File

@@ -40,7 +40,7 @@ We will help you to contribute in different areas like filing issues, developing
getting your work reviewed and merged.
If you have questions about the development process,
feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose).
feel free to [file an issue](https://github.com/zeromicro/go-zero/issues/new/choose).
## Find something to work on
@@ -50,10 +50,10 @@ Here is how you get started.
### Find a good first topic
[go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue.
For example, [go-zero](https://github.com/tal-tech/go-zero) has
[help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
[good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
[go-zero](https://github.com/zeromicro/go-zero) has beginner-friendly issues that provide a good first issue.
For example, [go-zero](https://github.com/zeromicro/go-zero) has
[help wanted](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
[good first issue](https://github.com/zeromicro/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
labels for issues that should not need deep knowledge of the system.
We can help new contributors who wish to work on such issues.
@@ -79,7 +79,7 @@ This is a rough outline of what a contributor's workflow looks like:
- Create a topic branch from where to base the contribution. This is usually master.
- Make commits of logical units.
- Push changes in a topic branch to a personal fork of the repository.
- Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero).
- Submit a pull request to [go-zero](https://github.com/zeromicro/go-zero).
## Creating Pull Requests

View File

@@ -14,13 +14,15 @@ We hope that the items listed below will inspire further engagement from the com
## 2021 Q4
- [x] Support `username/password` authentication in ETCD
- [x] Support `SSL/TLS` in ETCD
- [x] Support `SSL/TLS` in `zRPC`
- [x] Support `TLS` in redis connections
- [x] Support `goctl bug` to report bugs conveniently
## 2022
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file
- [x] Support `context` in redis related methods for timeout and tracing
- [x] Support `context` in sql related methods for timeout and tracing
- [ ] Support `context` in mongodb related methods for timeout and tracing
- [ ] Add `httpx.Client` with governance, like circuit breaker etc.
- [ ] Support `goctl doctor` command to report potential issues for given service
- [ ] Support `context` in redis related methods for timeout and tracing
- [ ] Support `context` in sql related methods for timeout and tracing
- [ ] Support `context` in mongodb related methods for timeout and tracing
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file

View File

@@ -4,8 +4,8 @@ import (
"errors"
"strconv"
"github.com/tal-tech/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/hash"
"github.com/zeromicro/go-zero/core/stores/redis"
)
const (

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
)
func TestRedisBitSet_New_Set_Test(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"strings"
"sync"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/timex"
)
const (

View File

@@ -8,7 +8,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/stat"
)
func init() {

View File

@@ -6,7 +6,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/stat"
)
func init() {

View File

@@ -4,8 +4,8 @@ import (
"math"
"time"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx"
)
const (

View File

@@ -7,9 +7,9 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat"
)
const (

View File

@@ -8,8 +8,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/iox"
"github.com/zeromicro/go-zero/core/lang"
)
func TestEnterToContinue(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"encoding/base64"
"errors"
"github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/logx"
)
// ErrPaddingSize indicates bad padding size.

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/fs"
)
const (

View File

@@ -6,9 +6,9 @@ import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/syncx"
)
const (

View File

@@ -61,3 +61,41 @@ func TestPutMore(t *testing.T) {
assert.Equal(t, string(element), string(body.([]byte)))
}
}
func TestPutMoreWithHeaderNotZero(t *testing.T) {
elements := [][]byte{
[]byte("hello"),
[]byte("world"),
[]byte("again"),
}
queue := NewQueue(4)
for i := range elements {
queue.Put(elements[i])
}
// take 1
body, ok := queue.Take()
assert.True(t, ok)
element, ok := body.([]byte)
assert.True(t, ok)
assert.Equal(t, element, []byte("hello"))
// put more
queue.Put([]byte("b4"))
queue.Put([]byte("b5")) // will store in elements[0]
queue.Put([]byte("b6")) // cause expansion
results := [][]byte{
[]byte("world"),
[]byte("again"),
[]byte("b4"),
[]byte("b5"),
[]byte("b6"),
}
for _, element := range results {
body, ok := queue.Take()
assert.True(t, ok)
assert.Equal(t, string(element), string(body.([]byte)))
}
}

View File

@@ -4,7 +4,7 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/timex"
)
type (

View File

@@ -6,7 +6,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/stringx"
)
const duration = time.Millisecond * 50

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/stringx"
)
func TestSafeMap(t *testing.T) {

View File

@@ -1,8 +1,8 @@
package collection
import (
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
)
const (

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/logx"
)
func init() {

View File

@@ -5,9 +5,9 @@ import (
"fmt"
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex"
)
const drainWorkers = 8

View File

@@ -8,10 +8,10 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
)
const (

View File

@@ -7,7 +7,7 @@ import (
"os"
"path"
"github.com/tal-tech/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/mapping"
)
var loaders = map[string]func([]byte, interface{}) error{

View File

@@ -6,8 +6,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/hash"
"github.com/zeromicro/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/hash"
)
func TestLoadConfig_notExists(t *testing.T) {

View File

@@ -7,7 +7,7 @@ import (
"strings"
"sync"
"github.com/tal-tech/go-zero/core/iox"
"github.com/zeromicro/go-zero/core/iox"
)
// PropertyError represents a configuration error message.

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/fs"
)
func TestProperties(t *testing.T) {

View File

@@ -3,7 +3,7 @@ package contextx
import (
"context"
"github.com/tal-tech/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/mapping"
)
const contextTagKey = "ctx"

View File

@@ -1,7 +1,14 @@
package discov
import "github.com/tal-tech/go-zero/core/discov/internal"
import "github.com/zeromicro/go-zero/core/discov/internal"
// RegisterAccount registers the username/password to the given etcd cluster.
func RegisterAccount(endpoints []string, user, pass string) {
internal.AddAccount(endpoints, user, pass)
}
// RegisterTLS registers the CertFile/CertKeyFile/CACertFile to the given etcd.
func RegisterTLS(endpoints []string, certFile, certKeyFile, caFile string,
insecureSkipVerify bool) error {
return internal.AddTLS(endpoints, certFile, certKeyFile, caFile, insecureSkipVerify)
}

View File

@@ -4,8 +4,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/stringx"
)
func TestRegisterAccount(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"strings"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/discov/internal"
)
const (

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/discov/internal"
)
var mockLock sync.Mutex

View File

@@ -2,12 +2,23 @@ package discov
import "errors"
var (
// errEmptyEtcdHosts indicates that etcd hosts are empty.
errEmptyEtcdHosts = errors.New("empty etcd hosts")
// errEmptyEtcdKey indicates that etcd key is empty.
errEmptyEtcdKey = errors.New("empty etcd key")
)
// EtcdConf is the config item with the given key on etcd.
type EtcdConf struct {
Hosts []string
Key string
User string `json:",optional"`
Pass string `json:",optional"`
Hosts []string
Key string
User string `json:",optional"`
Pass string `json:",optional"`
CertFile string `json:",optional"`
CertKeyFile string `json:",optional=CertFile"`
CACertFile string `json:",optional=CertFile"`
InsecureSkipVerify bool `json:",optional"`
}
// HasAccount returns if account provided.
@@ -15,12 +26,17 @@ func (c EtcdConf) HasAccount() bool {
return len(c.User) > 0 && len(c.Pass) > 0
}
// HasTLS returns if TLS CertFile/CertKeyFile/CACertFile are provided.
func (c EtcdConf) HasTLS() bool {
return len(c.CertFile) > 0 && len(c.CertKeyFile) > 0 && len(c.CACertFile) > 0
}
// Validate validates c.
func (c EtcdConf) Validate() error {
if len(c.Hosts) == 0 {
return errors.New("empty etcd hosts")
return errEmptyEtcdHosts
} else if len(c.Key) == 0 {
return errors.New("empty etcd key")
return errEmptyEtcdKey
} else {
return nil
}

View File

@@ -1,17 +1,25 @@
package internal
import "sync"
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"sync"
)
var (
accounts = make(map[string]Account)
tlsConfigs = make(map[string]*tls.Config)
lock sync.RWMutex
)
// Account holds the username/password for an etcd cluster.
type Account struct {
User string
Pass string
}
var (
accounts = make(map[string]Account)
lock sync.RWMutex
)
// AddAccount adds the username/password for the given etcd cluster.
func AddAccount(endpoints []string, user, pass string) {
lock.Lock()
defer lock.Unlock()
@@ -22,6 +30,33 @@ func AddAccount(endpoints []string, user, pass string) {
}
}
// AddTLS adds the tls cert files for the given etcd cluster.
func AddTLS(endpoints []string, certFile, certKeyFile, caFile string, insecureSkipVerify bool) error {
cert, err := tls.LoadX509KeyPair(certFile, certKeyFile)
if err != nil {
return err
}
caData, err := ioutil.ReadFile(caFile)
if err != nil {
return err
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData)
lock.Lock()
defer lock.Unlock()
tlsConfigs[getClusterKey(endpoints)] = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
InsecureSkipVerify: insecureSkipVerify,
}
return nil
}
// GetAccount gets the username/password for the given etcd cluster.
func GetAccount(endpoints []string) (Account, bool) {
lock.RLock()
defer lock.RUnlock()
@@ -29,3 +64,12 @@ func GetAccount(endpoints []string) (Account, bool) {
account, ok := accounts[getClusterKey(endpoints)]
return account, ok
}
// GetTLS gets the tls config for the given etcd cluster.
func GetTLS(endpoints []string) (*tls.Config, bool) {
lock.RLock()
defer lock.RUnlock()
cfg, ok := tlsConfigs[getClusterKey(endpoints)]
return cfg, ok
}

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/stringx"
)
func TestAccount(t *testing.T) {

View File

@@ -9,11 +9,11 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/contextx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3"
)
@@ -337,6 +337,9 @@ func DialClient(endpoints []string) (EtcdClient, error) {
cfg.Username = account.User
cfg.Password = account.Pass
}
if tlsCfg, ok := GetTLS(endpoints); ok {
cfg.TLS = tlsCfg
}
return clientv3.New(cfg)
}

View File

@@ -7,10 +7,10 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/contextx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

View File

@@ -1,12 +1,12 @@
package discov
import (
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3"
)
@@ -145,16 +145,23 @@ func (p *Publisher) revoke(cli internal.EtcdClient) {
}
}
// WithPubEtcdAccount provides the etcd username/password.
func WithPubEtcdAccount(user, pass string) PubOption {
return func(pub *Publisher) {
internal.AddAccount(pub.endpoints, user, pass)
}
}
// WithId customizes a Publisher with the id.
func WithId(id int64) PubOption {
return func(publisher *Publisher) {
publisher.id = id
}
}
// WithPubEtcdAccount provides the etcd username/password.
func WithPubEtcdAccount(user, pass string) PubOption {
return func(pub *Publisher) {
RegisterAccount(pub.endpoints, user, pass)
}
}
// WithPubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
func WithPubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) PubOption {
return func(pub *Publisher) {
logx.Must(RegisterTLS(pub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
}
}

View File

@@ -8,10 +8,10 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stringx"
clientv3 "go.etcd.io/etcd/client/v3"
)

View File

@@ -4,8 +4,9 @@ import (
"sync"
"sync/atomic"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
)
type (
@@ -58,9 +59,17 @@ func Exclusive() SubOption {
}
}
// WithSubEtcdAccount provides the etcd username/password.
func WithSubEtcdAccount(user, pass string) SubOption {
return func(sub *Subscriber) {
internal.AddAccount(sub.endpoints, user, pass)
RegisterAccount(sub.endpoints, user, pass)
}
}
// WithSubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
func WithSubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) SubOption {
return func(sub *Subscriber) {
logx.Must(RegisterTLS(sub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
}
}

View File

@@ -5,8 +5,8 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/discov/internal"
"github.com/zeromicro/go-zero/core/stringx"
)
const (

View File

@@ -11,10 +11,12 @@ type (
errorArray []error
)
// Add adds err to be.
func (be *BatchError) Add(err error) {
if err != nil {
be.errs = append(be.errs, err)
// Add adds errs to be, nil errors are ignored.
func (be *BatchError) Add(errs ...error) {
for _, err := range errs {
if err != nil {
be.errs = append(be.errs, err)
}
}
}

View File

@@ -4,7 +4,7 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/threading"
)
// A DelayExecutor delays a tasks on given delay interval.

View File

@@ -3,8 +3,8 @@ package executors
import (
"time"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
)
// A LessExecutor is an executor to limit execution once within given time interval.

View File

@@ -5,7 +5,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/timex"
)
func TestLessExecutor_DoOrDiscard(t *testing.T) {

View File

@@ -6,11 +6,11 @@ import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex"
)
const idleRound = 10

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/timex"
)
const threshold = 10

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/fs"
)
const (

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/fs"
)
func TestSplitLineChunks(t *testing.T) {

View File

@@ -5,6 +5,9 @@ import (
"os"
)
// errExceedFileSize indicates that the file size is exceeded.
var errExceedFileSize = errors.New("exceed file size")
// A RangeReader is used to read a range of content from a file.
type RangeReader struct {
file *os.File
@@ -29,7 +32,7 @@ func (rr *RangeReader) Read(p []byte) (n int, err error) {
}
if rr.stop < rr.start || rr.start >= stat.Size() {
return 0, errors.New("exceed file size")
return 0, errExceedFileSize
}
if rr.stop-rr.start < int64(len(p)) {

View File

@@ -5,7 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/fs"
)
func TestRangeReader(t *testing.T) {

View File

@@ -4,7 +4,7 @@ import (
"io/ioutil"
"os"
"github.com/tal-tech/go-zero/core/hash"
"github.com/zeromicro/go-zero/core/hash"
)
// TempFileWithText creates the temporary file with the given content,

View File

@@ -1,6 +1,6 @@
package fx
import "github.com/tal-tech/go-zero/core/threading"
import "github.com/zeromicro/go-zero/core/threading"
// Parallel runs fns parallelly and waits for done.
func Parallel(fns ...func()) {

View File

@@ -1,6 +1,6 @@
package fx
import "github.com/tal-tech/go-zero/core/errorx"
import "github.com/zeromicro/go-zero/core/errorx"
const defaultRetryTimes = 3

View File

@@ -4,9 +4,9 @@ import (
"sort"
"sync"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading"
)
const (
@@ -90,6 +90,8 @@ func Range(source <-chan interface{}) Stream {
func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
for item := range s.source {
if !predicate(item) {
// make sure the former goroutine not block, and current func returns fast.
go drain(s.source)
return false
}
}
@@ -103,6 +105,8 @@ func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
for item := range s.source {
if predicate(item) {
// make sure the former goroutine not block, and current func returns fast.
go drain(s.source)
return true
}
}
@@ -186,8 +190,7 @@ func (s Stream) Distinct(fn KeyFunc) Stream {
// Done waits all upstreaming operations to be done.
func (s Stream) Done() {
for range s.source {
}
drain(s.source)
}
// Filter filters the items by the given FilterFunc.
@@ -199,9 +202,22 @@ func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {
}, opts...)
}
// First returns the first item, nil if no items.
func (s Stream) First() interface{} {
for item := range s.source {
// make sure the former goroutine not block, and current func returns fast.
go drain(s.source)
return item
}
return nil
}
// ForAll handles the streaming elements from the source and no later streams.
func (s Stream) ForAll(fn ForAllFunc) {
fn(s.source)
// avoid goroutine leak on fn not consuming all items.
go drain(s.source)
}
// ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
@@ -246,11 +262,14 @@ func (s Stream) Head(n int64) Stream {
}
if n == 0 {
// let successive method go ASAP even we have more items to skip
// why we don't just break the loop, because if breaks,
// this former goroutine will block forever, which will cause goroutine leak.
close(source)
// why we don't just break the loop, and drain to consume all items.
// because if breaks, this former goroutine will block forever,
// which will cause goroutine leak.
drain(s.source)
}
}
// not enough items in s.source, but we need to let successive method to go ASAP.
if n > 0 {
close(source)
}
@@ -259,6 +278,13 @@ func (s Stream) Head(n int64) Stream {
return Range(source)
}
// Last returns the last item, or nil if no items.
func (s Stream) Last() (item interface{}) {
for item = range s.source {
}
return
}
// Map converts each item to another corresponding item, which means it's a 1:1 model.
func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
@@ -280,6 +306,21 @@ func (s Stream) Merge() Stream {
return Range(source)
}
// NoneMatch returns whether all elements of this stream don't match the provided predicate.
// May not evaluate the predicate on all elements if not necessary for determining the result.
// If the stream is empty then true is returned and the predicate is not evaluated.
func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool {
for item := range s.source {
if predicate(item) {
// make sure the former goroutine not block, and current func returns fast.
go drain(s.source)
return false
}
}
return true
}
// Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
s.Walk(func(item interface{}, pipe chan<- interface{}) {
@@ -411,15 +452,12 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
var wg sync.WaitGroup
pool := make(chan lang.PlaceholderType, option.workers)
for {
for item := range s.source {
// important, used in another goroutine
val := item
pool <- lang.Placeholder
item, ok := <-s.source
if !ok {
<-pool
break
}
wg.Add(1)
// better to safely run caller defined method
threading.GoSafe(func() {
defer func() {
@@ -427,7 +465,7 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
<-pool
}()
fn(item, pipe)
fn(val, pipe)
})
}
@@ -439,22 +477,19 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
}
func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
pipe := make(chan interface{}, defaultWorkers)
pipe := make(chan interface{}, option.workers)
go func() {
var wg sync.WaitGroup
for {
item, ok := <-s.source
if !ok {
break
}
for item := range s.source {
// important, used in another goroutine
val := item
wg.Add(1)
// better to safely run caller defined method
threading.GoSafe(func() {
defer wg.Done()
fn(item, pipe)
fn(val, pipe)
})
}
@@ -465,14 +500,14 @@ func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
return Range(pipe)
}
// UnlimitedWorkers lets the caller to use as many workers as the tasks.
// UnlimitedWorkers lets the caller use as many workers as the tasks.
func UnlimitedWorkers() Option {
return func(opts *rxOptions) {
opts.unlimitedWorkers = true
}
}
// WithWorkers lets the caller to customize the concurrent workers.
// WithWorkers lets the caller customize the concurrent workers.
func WithWorkers(workers int) Option {
return func(opts *rxOptions) {
if workers < minWorkers {
@@ -483,6 +518,7 @@ func WithWorkers(workers int) Option {
}
}
// buildOptions returns a rxOptions with given customizations.
func buildOptions(opts ...Option) *rxOptions {
options := newOptions()
for _, opt := range opts {
@@ -492,6 +528,13 @@ func buildOptions(opts ...Option) *rxOptions {
return options
}
// drain drains the given channel.
func drain(channel <-chan interface{}) {
for range channel {
}
}
// newOptions returns a default rxOptions.
func newOptions() *rxOptions {
return &rxOptions{
workers: defaultWorkers,

View File

@@ -13,324 +13,494 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/stringx"
"go.uber.org/goleak"
)
func TestBuffer(t *testing.T) {
const N = 5
var count int32
var wait sync.WaitGroup
wait.Add(1)
From(func(source chan<- interface{}) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
runCheckedTest(t, func(t *testing.T) {
const N = 5
var count int32
var wait sync.WaitGroup
wait.Add(1)
From(func(source chan<- interface{}) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 2*N; i++ {
select {
case source <- i:
atomic.AddInt32(&count, 1)
case <-ticker.C:
wait.Done()
return
for i := 0; i < 2*N; i++ {
select {
case source <- i:
atomic.AddInt32(&count, 1)
case <-ticker.C:
wait.Done()
return
}
}
}
}).Buffer(N).ForAll(func(pipe <-chan interface{}) {
wait.Wait()
// why N+1, because take one more to wait for sending into the channel
assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
}).Buffer(N).ForAll(func(pipe <-chan interface{}) {
wait.Wait()
// why N+1, because take one more to wait for sending into the channel
assert.Equal(t, int32(N+1), atomic.LoadInt32(&count))
})
})
}
func TestBufferNegative(t *testing.T) {
var result int
Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4).Buffer(-1).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, 10, result)
})
assert.Equal(t, 10, result)
}
func TestCount(t *testing.T) {
tests := []struct {
name string
elements []interface{}
}{
{
name: "no elements with nil",
},
{
name: "no elements",
elements: []interface{}{},
},
{
name: "1 element",
elements: []interface{}{1},
},
{
name: "multiple elements",
elements: []interface{}{1, 2, 3},
},
}
runCheckedTest(t, func(t *testing.T) {
tests := []struct {
name string
elements []interface{}
}{
{
name: "no elements with nil",
},
{
name: "no elements",
elements: []interface{}{},
},
{
name: "1 element",
elements: []interface{}{1},
},
{
name: "multiple elements",
elements: []interface{}{1, 2, 3},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
val := Just(test.elements...).Count()
assert.Equal(t, len(test.elements), val)
})
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
val := Just(test.elements...).Count()
assert.Equal(t, len(test.elements), val)
})
}
})
}
func TestDone(t *testing.T) {
var count int32
Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
time.Sleep(time.Millisecond * 100)
atomic.AddInt32(&count, int32(item.(int)))
}).Done()
assert.Equal(t, int32(6), count)
runCheckedTest(t, func(t *testing.T) {
var count int32
Just(1, 2, 3).Walk(func(item interface{}, pipe chan<- interface{}) {
time.Sleep(time.Millisecond * 100)
atomic.AddInt32(&count, int32(item.(int)))
}).Done()
assert.Equal(t, int32(6), count)
})
}
func TestJust(t *testing.T) {
var result int
Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, 10, result)
})
assert.Equal(t, 10, result)
}
func TestDistinct(t *testing.T) {
var result int
Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
return item
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
runCheckedTest(t, func(t *testing.T) {
var result int
Just(4, 1, 3, 2, 3, 4).Distinct(func(item interface{}) interface{} {
return item
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, 10, result)
})
assert.Equal(t, 10, result)
}
func TestFilter(t *testing.T) {
var result int
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
return item.(int)%2 == 0
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
return item.(int)%2 == 0
}).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, 6, result)
})
}
func TestFirst(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assert.Nil(t, Just().First())
assert.Equal(t, "foo", Just("foo").First())
assert.Equal(t, "foo", Just("foo", "bar").First())
})
assert.Equal(t, 6, result)
}
func TestForAll(t *testing.T) {
var result int
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
return item.(int)%2 == 0
}).ForAll(func(pipe <-chan interface{}) {
for item := range pipe {
result += item.(int)
}
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4).Filter(func(item interface{}) bool {
return item.(int)%2 == 0
}).ForAll(func(pipe <-chan interface{}) {
for item := range pipe {
result += item.(int)
}
})
assert.Equal(t, 6, result)
})
assert.Equal(t, 6, result)
}
func TestGroup(t *testing.T) {
var groups [][]int
Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
v := item.(int)
return v / 10
}).ForEach(func(item interface{}) {
v := item.([]interface{})
var group []int
for _, each := range v {
group = append(group, each.(int))
}
groups = append(groups, group)
})
runCheckedTest(t, func(t *testing.T) {
var groups [][]int
Just(10, 11, 20, 21).Group(func(item interface{}) interface{} {
v := item.(int)
return v / 10
}).ForEach(func(item interface{}) {
v := item.([]interface{})
var group []int
for _, each := range v {
group = append(group, each.(int))
}
groups = append(groups, group)
})
assert.Equal(t, 2, len(groups))
for _, group := range groups {
assert.Equal(t, 2, len(group))
assert.True(t, group[0]/10 == group[1]/10)
}
assert.Equal(t, 2, len(groups))
for _, group := range groups {
assert.Equal(t, 2, len(group))
assert.True(t, group[0]/10 == group[1]/10)
}
})
}
func TestHead(t *testing.T) {
var result int
Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4).Head(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, 3, result)
})
assert.Equal(t, 3, result)
}
func TestHeadZero(t *testing.T) {
assert.Panics(t, func() {
Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
return nil, nil
runCheckedTest(t, func(t *testing.T) {
assert.Panics(t, func() {
Just(1, 2, 3, 4).Head(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
return nil, nil
})
})
})
}
func TestHeadMore(t *testing.T) {
var result int
Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4).Head(6).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, 10, result)
})
}
func TestLast(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
goroutines := runtime.NumGoroutine()
assert.Nil(t, Just().Last())
assert.Equal(t, "foo", Just("foo").Last())
assert.Equal(t, "bar", Just("foo", "bar").Last())
// let scheduler schedule first
runtime.Gosched()
assert.Equal(t, goroutines, runtime.NumGoroutine())
})
assert.Equal(t, 10, result)
}
func TestMap(t *testing.T) {
log.SetOutput(ioutil.Discard)
runCheckedTest(t, func(t *testing.T) {
log.SetOutput(ioutil.Discard)
tests := []struct {
mapper MapFunc
expect int
}{
{
mapper: func(item interface{}) interface{} {
v := item.(int)
return v * v
tests := []struct {
mapper MapFunc
expect int
}{
{
mapper: func(item interface{}) interface{} {
v := item.(int)
return v * v
},
expect: 30,
},
expect: 30,
},
{
mapper: func(item interface{}) interface{} {
v := item.(int)
if v%2 == 0 {
return 0
}
return v * v
},
expect: 10,
},
{
mapper: func(item interface{}) interface{} {
v := item.(int)
if v%2 == 0 {
panic(v)
}
return v * v
},
expect: 10,
},
}
// Map(...) works even WithWorkers(0)
for i, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) {
var result int
var workers int
if i%2 == 0 {
workers = 0
} else {
workers = runtime.NumCPU()
}
From(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}).Map(test.mapper, WithWorkers(workers)).Reduce(
func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
{
mapper: func(item interface{}) interface{} {
v := item.(int)
if v%2 == 0 {
return 0
}
return result, nil
})
return v * v
},
expect: 10,
},
{
mapper: func(item interface{}) interface{} {
v := item.(int)
if v%2 == 0 {
panic(v)
}
return v * v
},
expect: 10,
},
}
assert.Equal(t, test.expect, result)
})
}
// Map(...) works even WithWorkers(0)
for i, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) {
var result int
var workers int
if i%2 == 0 {
workers = 0
} else {
workers = runtime.NumCPU()
}
From(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}).Map(test.mapper, WithWorkers(workers)).Reduce(
func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, test.expect, result)
})
}
})
}
func TestMerge(t *testing.T) {
Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
assert.ElementsMatch(t, []interface{}{1, 2, 3, 4}, item.([]interface{}))
})
})
}
func TestParallelJust(t *testing.T) {
var count int32
Just(1, 2, 3).Parallel(func(item interface{}) {
time.Sleep(time.Millisecond * 100)
atomic.AddInt32(&count, int32(item.(int)))
}, UnlimitedWorkers())
assert.Equal(t, int32(6), count)
runCheckedTest(t, func(t *testing.T) {
var count int32
Just(1, 2, 3).Parallel(func(item interface{}) {
time.Sleep(time.Millisecond * 100)
atomic.AddInt32(&count, int32(item.(int)))
}, UnlimitedWorkers())
assert.Equal(t, int32(6), count)
})
}
func TestReverse(t *testing.T) {
Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Reverse().Merge().ForEach(func(item interface{}) {
assert.ElementsMatch(t, []interface{}{4, 3, 2, 1}, item.([]interface{}))
})
})
}
func TestSort(t *testing.T) {
var prev int
Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
return a.(int) < b.(int)
}).ForEach(func(item interface{}) {
next := item.(int)
assert.True(t, prev < next)
prev = next
runCheckedTest(t, func(t *testing.T) {
var prev int
Just(5, 3, 7, 1, 9, 6, 4, 8, 2).Sort(func(a, b interface{}) bool {
return a.(int) < b.(int)
}).ForEach(func(item interface{}) {
next := item.(int)
assert.True(t, prev < next)
prev = next
})
})
}
func TestSplit(t *testing.T) {
assert.Panics(t, func() {
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
runCheckedTest(t, func(t *testing.T) {
assert.Panics(t, func() {
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(0).Done()
})
var chunks [][]interface{}
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
chunk := item.([]interface{})
chunks = append(chunks, chunk)
})
assert.EqualValues(t, [][]interface{}{
{1, 2, 3, 4},
{5, 6, 7, 8},
{9, 10},
}, chunks)
})
var chunks [][]interface{}
Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Split(4).ForEach(func(item interface{}) {
chunk := item.([]interface{})
chunks = append(chunks, chunk)
})
assert.EqualValues(t, [][]interface{}{
{1, 2, 3, 4},
{5, 6, 7, 8},
{9, 10},
}, chunks)
}
func TestTail(t *testing.T) {
var result int
Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4).Tail(2).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
}
return result, nil
})
assert.Equal(t, 7, result)
})
assert.Equal(t, 7, result)
}
func TestTailZero(t *testing.T) {
assert.Panics(t, func() {
Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
return nil, nil
runCheckedTest(t, func(t *testing.T) {
assert.Panics(t, func() {
Just(1, 2, 3, 4).Tail(0).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
return nil, nil
})
})
})
}
func TestWalk(t *testing.T) {
var result int
Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
if item.(int)%2 != 0 {
pipe <- item
}
}, UnlimitedWorkers()).ForEach(func(item interface{}) {
result += item.(int)
runCheckedTest(t, func(t *testing.T) {
var result int
Just(1, 2, 3, 4, 5).Walk(func(item interface{}, pipe chan<- interface{}) {
if item.(int)%2 != 0 {
pipe <- item
}
}, UnlimitedWorkers()).ForEach(func(item interface{}) {
result += item.(int)
})
assert.Equal(t, 9, result)
})
}
func TestStream_AnyMach(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 4
}))
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 0
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
})
}
func TestStream_AllMach(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return item.(int) == 1
}),
)
})
}
func TestStream_NoneMatch(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, true, Just(1, 2, 3).NoneMatch(func(item interface{}) bool {
return item.(int) == 4
}),
)
})
}
func TestConcat(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
a1 := []interface{}{1, 2, 3}
a2 := []interface{}{4, 5, 6}
s1 := Just(a1...)
s2 := Just(a2...)
stream := Concat(s1, s2)
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
ints := make([]interface{}, 0)
ints = append(ints, a1...)
ints = append(ints, a2...)
assetEqual(t, ints, items)
})
}
func TestStream_Skip(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
assert.Panics(t, func() {
Just(1, 2, 3, 4).Skip(-1)
})
})
}
func TestStream_Concat(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
stream := Just(1).Concat(Just(2), Just(3))
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
assetEqual(t, []interface{}{1, 2, 3}, items)
just := Just(1)
equal(t, just.Concat(just), []interface{}{1})
})
assert.Equal(t, 9, result)
}
func BenchmarkParallelMapReduce(b *testing.B) {
@@ -377,6 +547,12 @@ func BenchmarkMapReduce(b *testing.B) {
}).Map(mapper).Reduce(reducer)
}
func assetEqual(t *testing.T, except, data interface{}) {
if !reflect.DeepEqual(except, data) {
t.Errorf(" %v, want %v", data, except)
}
}
func equal(t *testing.T, stream Stream, data []interface{}) {
items := make([]interface{}, 0)
for item := range stream.source {
@@ -387,85 +563,7 @@ func equal(t *testing.T, stream Stream, data []interface{}) {
}
}
func assetEqual(t *testing.T, except, data interface{}) {
if !reflect.DeepEqual(except, data) {
t.Errorf(" %v, want %v", data, except)
}
}
func TestStream_AnyMach(t *testing.T) {
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 4
}))
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 0
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return item.(int) == 2
}))
}
func TestStream_AllMach(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return item.(int) == 1
}),
)
}
func TestConcat(t *testing.T) {
a1 := []interface{}{1, 2, 3}
a2 := []interface{}{4, 5, 6}
s1 := Just(a1...)
s2 := Just(a2...)
stream := Concat(s1, s2)
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
ints := make([]interface{}, 0)
ints = append(ints, a1...)
ints = append(ints, a2...)
assetEqual(t, ints, items)
}
func TestStream_Skip(t *testing.T) {
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
assert.Panics(t, func() {
Just(1, 2, 3, 4).Skip(-1)
})
}
func TestStream_Concat(t *testing.T) {
stream := Just(1).Concat(Just(2), Just(3))
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
assetEqual(t, []interface{}{1, 2, 3}, items)
just := Just(1)
equal(t, just.Concat(just), []interface{}{1})
func runCheckedTest(t *testing.T, fn func(t *testing.T)) {
defer goleak.VerifyNone(t)
fn(t)
}

View File

@@ -6,8 +6,8 @@ import (
"strconv"
"sync"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/mapping"
)
const (

View File

@@ -6,7 +6,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/mathx"
)
const (

View File

@@ -9,8 +9,8 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/stringx"
)
func TestReadText(t *testing.T) {

View File

@@ -51,5 +51,5 @@ func unmarshalUseNumber(decoder *json.Decoder, v interface{}) error {
}
func formatError(v string, err error) error {
return fmt.Errorf("string: `%s`, error: `%s`", v, err.Error())
return fmt.Errorf("string: `%s`, error: `%w`", v, err)
}

View File

@@ -4,8 +4,8 @@ package lang
var Placeholder PlaceholderType
type (
// GenericType can be used to hold any type.
GenericType = interface{}
// AnyType can be used to hold any type.
AnyType = interface{}
// PlaceholderType represents a placeholder type.
PlaceholderType = struct{}
)

View File

@@ -5,12 +5,11 @@ import (
"strconv"
"time"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis"
)
const (
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
periodScript = `local limit = tonumber(ARGV[1])
// to be compatible with aliyun redis, we cannot use `local key = KEYS[1]` to reuse the key
const periodScript = `local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call("INCRBY", KEYS[1], 1)
if current == 1 then
@@ -23,8 +22,6 @@ elseif current == limit then
else
return 0
end`
zoneDiff = 3600 * 8 // GMT+8 for our services
)
const (
// Unknown means not initialized state.
@@ -104,7 +101,9 @@ func (h *PeriodLimit) Take(key string) (int, error) {
func (h *PeriodLimit) calcExpireSeconds() int {
if h.align {
unix := time.Now().Unix() + zoneDiff
now := time.Now()
_, offset := now.Zone()
unix := now.Unix() + int64(offset)
return h.period - int(unix%int64(h.period))
}
@@ -112,6 +111,8 @@ func (h *PeriodLimit) calcExpireSeconds() int {
}
// Align returns a func to customize a PeriodLimit with alignment.
// For example, if we want to limit end users with 5 sms verification messages every day,
// we need to align with the local timezone and the start of the day.
func Align() PeriodOption {
return func(l *PeriodLimit) {
l.align = true

View File

@@ -5,8 +5,8 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
)
func TestPeriodLimit_Take(t *testing.T) {
@@ -23,10 +23,9 @@ func TestPeriodLimit_RedisUnavailable(t *testing.T) {
const (
seconds = 1
total = 100
quota = 5
)
l := NewPeriodLimit(seconds, quota, redis.NewRedis(s.Addr(), redis.NodeType), "periodlimit")
l := NewPeriodLimit(seconds, quota, redis.New(s.Addr()), "periodlimit")
s.Close()
val, err := l.Take("first")
assert.NotNil(t, err)

View File

@@ -7,8 +7,8 @@ import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
xrate "golang.org/x/time/rate"
)
@@ -85,8 +85,8 @@ func (lim *TokenLimiter) Allow() bool {
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate rate.
// Otherwise use Reserve or Wait.
// Use this method if you intend to drop / skip events that exceed the rate.
// Otherwise, use Reserve or Wait.
func (lim *TokenLimiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n)
}
@@ -112,7 +112,8 @@ func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
// Lua boolean false -> r Nil bulk reply
if err == redis.Nil {
return false
} else if err != nil {
}
if err != nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)

View File

@@ -6,9 +6,9 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/redis"
"github.com/tal-tech/go-zero/core/stores/redis/redistest"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
)
func init() {

View File

@@ -7,11 +7,11 @@ import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
)
const (

View File

@@ -8,11 +8,11 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/mathx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx"
)
const (

View File

@@ -3,7 +3,7 @@ package load
import (
"io"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/syncx"
)
// A ShedderGroup is a manager to manage key based shedders.

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stat"
)
type (

View File

@@ -3,10 +3,11 @@ package logx
// A LogConf is a logging config.
type LogConf struct {
ServiceName string `json:",optional"`
Mode string `json:",default=console,options=console|file|volume"`
Mode string `json:",default=console,options=[console,file,volume]"`
Encoding string `json:",default=json,options=[json,plain]"`
TimeFormat string `json:",optional"`
Path string `json:",default=logs"`
Level string `json:",default=info,options=info|error|severe"`
Level string `json:",default=info,options=[info,error,severe]"`
Compress bool `json:",optional"`
KeepDays int `json:",optional"`
StackCooldownMillis int `json:",default=100"`

View File

@@ -3,9 +3,10 @@ package logx
import (
"fmt"
"io"
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/timex"
)
const durationCallerDepth = 3
@@ -79,10 +80,15 @@ func (l *durationLogger) WithDuration(duration time.Duration) Logger {
}
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
outputJson(writer, &durationLogger{
Timestamp: getTimestamp(),
Level: level,
Content: val,
Duration: l.Duration,
})
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainAny(writer, level, val, l.Duration)
default:
outputJson(writer, &durationLogger{
Timestamp: getTimestamp(),
Level: level,
Content: val,
Duration: l.Duration,
})
}
}

View File

@@ -3,6 +3,7 @@ package logx
import (
"log"
"strings"
"sync/atomic"
"testing"
"time"
@@ -37,6 +38,19 @@ func TestWithDurationInfo(t *testing.T) {
assert.True(t, strings.Contains(builder.String(), "duration"), builder.String())
}
func TestWithDurationInfoConsole(t *testing.T) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
var builder strings.Builder
log.SetOutput(&builder)
WithDuration(time.Second).Info("foo")
assert.True(t, strings.Contains(builder.String(), "ms"), builder.String())
}
func TestWithDurationInfof(t *testing.T) {
var builder strings.Builder
log.SetOutput(&builder)

View File

@@ -4,8 +4,8 @@ import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
)
type limitedExecutor struct {

View File

@@ -6,7 +6,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/timex"
)
func TestLimitedExecutor_logOrDiscard(t *testing.T) {

View File

@@ -1,6 +1,7 @@
package logx
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@@ -17,9 +18,9 @@ import (
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/sysx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/iox"
"github.com/zeromicro/go-zero/core/sysx"
"github.com/zeromicro/go-zero/core/timex"
)
const (
@@ -31,6 +32,15 @@ const (
SevereLevel
)
const (
jsonEncodingType = iota
plainEncodingType
jsonEncoding = "json"
plainEncoding = "plain"
plainEncodingSep = '\t'
)
const (
accessFilename = "access.log"
errorFilename = "error.log"
@@ -62,9 +72,10 @@ var (
// ErrLogServiceNameNotSet is an error that indicates that the service name is not set.
ErrLogServiceNameNotSet = errors.New("log service name must be set")
timeFormat = "2006-01-02T15:04:05.000Z07"
timeFormat = "2006-01-02T15:04:05.000Z07:00"
writeConsole bool
logLevel uint32
encoding uint32 = jsonEncodingType
// use uint32 for atomic operations
disableStat uint32
infoLog io.WriteCloser
@@ -124,6 +135,12 @@ func SetUp(c LogConf) error {
if len(c.TimeFormat) > 0 {
timeFormat = c.TimeFormat
}
switch c.Encoding {
case plainEncoding:
atomic.StoreUint32(&encoding, plainEncodingType)
default:
atomic.StoreUint32(&encoding, jsonEncodingType)
}
switch c.Mode {
case consoleMode:
@@ -407,21 +424,31 @@ func infoTextSync(msg string) {
}
func outputAny(writer io.Writer, level string, val interface{}) {
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: val,
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainAny(writer, level, val)
default:
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: val,
}
outputJson(writer, info)
}
outputJson(writer, info)
}
func outputText(writer io.Writer, level, msg string) {
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: msg,
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainText(writer, level, msg)
default:
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: msg,
}
outputJson(writer, info)
}
outputJson(writer, info)
}
func outputError(writer io.Writer, msg string, callDepth int) {
@@ -565,6 +592,62 @@ func statSync(msg string) {
}
}
func writePlainAny(writer io.Writer, level string, val interface{}, fields ...string) {
switch v := val.(type) {
case string:
writePlainText(writer, level, v, fields...)
case error:
writePlainText(writer, level, v.Error(), fields...)
case fmt.Stringer:
writePlainText(writer, level, v.String(), fields...)
default:
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
if err := json.NewEncoder(&buf).Encode(val); err != nil {
log.Println(err.Error())
return
}
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
}
func writePlainText(writer io.Writer, level, msg string, fields ...string) {
var buf bytes.Buffer
buf.WriteString(getTimestamp())
buf.WriteByte(plainEncodingSep)
buf.WriteString(level)
for _, item := range fields {
buf.WriteByte(plainEncodingSep)
buf.WriteString(item)
}
buf.WriteByte(plainEncodingSep)
buf.WriteString(msg)
buf.WriteByte('\n')
if atomic.LoadUint32(&initialized) == 0 || writer == nil {
log.Println(buf.String())
return
}
if _, err := writer.Write(buf.Bytes()); err != nil {
log.Println(err.Error())
}
}
type logWriter struct {
logger *log.Logger
}

View File

@@ -141,6 +141,78 @@ func TestStructedLogInfov(t *testing.T) {
})
}
func TestStructedLogInfoConsoleAny(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(v)
})
}
func TestStructedLogInfoConsoleAnyString(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(fmt.Sprint(v...))
})
}
func TestStructedLogInfoConsoleAnyError(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(errors.New(fmt.Sprint(v...)))
})
}
func TestStructedLogInfoConsoleAnyStringer(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Infov(ValStringer{
val: fmt.Sprint(v...),
})
})
}
func TestStructedLogInfoConsoleText(t *testing.T) {
doTestStructedLogConsole(t, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, plainEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
Info(fmt.Sprint(v...))
})
}
func TestStructedLogSlow(t *testing.T) {
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
slowLog = writer
@@ -432,6 +504,17 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
assert.True(t, strings.Contains(val, message))
}
func doTestStructedLogConsole(t *testing.T, setup func(writer io.WriteCloser),
write func(...interface{})) {
const message = "hello there"
writer := new(mockWriter)
setup(writer)
atomic.StoreUint32(&initialized, 1)
write(message)
println(writer.String())
assert.True(t, strings.Contains(writer.String(), message))
}
func testSetLevelTwiceWithMode(t *testing.T, mode string) {
SetUp(LogConf{
Mode: mode,
@@ -456,3 +539,11 @@ func testSetLevelTwiceWithMode(t *testing.T, mode string) {
ErrorStackf(message)
assert.Equal(t, 0, writer.builder.Len())
}
type ValStringer struct {
val string
}
func (v ValStringer) String() string {
return v.val
}

View File

@@ -13,9 +13,9 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/fs"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/timex"
)
const (

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/fs"
"github.com/zeromicro/go-zero/core/fs"
)
func TestDailyRotateRuleMarkRotated(t *testing.T) {

View File

@@ -29,9 +29,9 @@ func TestRedirector(t *testing.T) {
}
func captureOutput(f func()) string {
atomic.StoreUint32(&initialized, 1)
writer := new(mockWriter)
infoLog = writer
atomic.StoreUint32(&initialized, 1)
prevLevel := atomic.LoadUint32(&logLevel)
SetLevel(InfoLevel)
@@ -44,5 +44,9 @@ func captureOutput(f func()) string {
func getContent(jsonStr string) string {
var entry logEntry
json.Unmarshal([]byte(jsonStr), &entry)
return entry.Content.(string)
val, ok := entry.Content.(string)
if ok {
return val
}
return ""
}

View File

@@ -4,9 +4,10 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/timex"
"github.com/zeromicro/go-zero/core/timex"
"go.opentelemetry.io/otel/trace"
)
@@ -77,16 +78,24 @@ func (l *traceLogger) WithDuration(duration time.Duration) Logger {
}
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
outputJson(writer, &traceLogger{
logEntry: logEntry{
Timestamp: getTimestamp(),
Level: level,
Duration: l.Duration,
Content: val,
},
Trace: traceIdFromContext(l.ctx),
Span: spanIdFromContext(l.ctx),
})
traceID := traceIdFromContext(l.ctx)
spanID := spanIdFromContext(l.ctx)
switch atomic.LoadUint32(&encoding) {
case plainEncodingType:
writePlainAny(writer, level, val, l.Duration, traceID, spanID)
default:
outputJson(writer, &traceLogger{
logEntry: logEntry{
Timestamp: getTimestamp(),
Level: level,
Duration: l.Duration,
Content: val,
},
Trace: traceID,
Span: spanID,
})
}
}
// WithContext sets ctx to log, for keeping tracing information.

View File

@@ -82,6 +82,37 @@ func TestTraceInfo(t *testing.T) {
assert.True(t, strings.Contains(buf.String(), spanKey))
}
func TestTraceInfoConsole(t *testing.T) {
old := atomic.LoadUint32(&encoding)
atomic.StoreUint32(&encoding, jsonEncodingType)
defer func() {
atomic.StoreUint32(&encoding, old)
}()
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
infoLog = newLogWriter(log.New(&buf, "", flags))
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
buf.Reset()
l.WithDuration(time.Second).Infov(testlog)
assert.True(t, strings.Contains(buf.String(), traceIdFromContext(ctx)))
assert.True(t, strings.Contains(buf.String(), spanIdFromContext(ctx)))
}
func TestTraceSlow(t *testing.T) {
var buf mockWriter
atomic.StoreUint32(&initialized, 1)

View File

@@ -3,7 +3,7 @@ package mapping
import (
"io"
"github.com/tal-tech/go-zero/core/jsonx"
"github.com/zeromicro/go-zero/core/jsonx"
)
const jsonTagKey = "json"

View File

@@ -9,9 +9,9 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/jsonx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/jsonx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/stringx"
)
const (
@@ -742,7 +742,9 @@ func getValueWithChainedKeys(m Valuer, keys []string) (interface{}, bool) {
if len(keys) == 1 {
v, ok := m.Value(keys[0])
return v, ok
} else if len(keys) > 1 {
}
if len(keys) > 1 {
if v, ok := m.Value(keys[0]); ok {
if nextm, ok := v.(map[string]interface{}); ok {
return getValueWithChainedKeys(MapValuer(nextm), keys[1:])

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/stringx"
)
// because json.Number doesn't support strconv.ParseUint(...),

View File

@@ -10,7 +10,7 @@ import (
"strings"
"sync"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/stringx"
)
const (

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"io"
"io/ioutil"
"gopkg.in/yaml.v2"
)
@@ -14,7 +13,7 @@ const yamlTagKey = "json"
var (
// ErrUnsupportedType is an error that indicates the config format is not supported.
ErrUnsupportedType = errors.New("only map-like configs are suported")
ErrUnsupportedType = errors.New("only map-like configs are supported")
yamlUnmarshaler = NewUnmarshaler(yamlTagKey)
)
@@ -29,39 +28,6 @@ func UnmarshalYamlReader(reader io.Reader, v interface{}) error {
return unmarshalYamlReader(reader, v, yamlUnmarshaler)
}
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
var o interface{}
if err := yamlUnmarshal(content, &o); err != nil {
return err
}
if m, ok := o.(map[string]interface{}); ok {
return unmarshaler.Unmarshal(m, v)
}
return ErrUnsupportedType
}
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
content, err := ioutil.ReadAll(reader)
if err != nil {
return err
}
return unmarshalYamlBytes(content, v, unmarshaler)
}
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
func yamlUnmarshal(in []byte, out interface{}) error {
var res interface{}
if err := yaml.Unmarshal(in, &res); err != nil {
return err
}
*out.(*interface{}) = cleanupMapValue(res)
return nil
}
func cleanupInterfaceMap(in map[interface{}]interface{}) map[string]interface{} {
res := make(map[string]interface{})
for k, v := range in {
@@ -96,3 +62,40 @@ func cleanupMapValue(v interface{}) interface{} {
return Repr(v)
}
}
func unmarshal(unmarshaler *Unmarshaler, o interface{}, v interface{}) error {
if m, ok := o.(map[string]interface{}); ok {
return unmarshaler.Unmarshal(m, v)
}
return ErrUnsupportedType
}
func unmarshalYamlBytes(content []byte, v interface{}, unmarshaler *Unmarshaler) error {
var o interface{}
if err := yamlUnmarshal(content, &o); err != nil {
return err
}
return unmarshal(unmarshaler, o, v)
}
func unmarshalYamlReader(reader io.Reader, v interface{}, unmarshaler *Unmarshaler) error {
var res interface{}
if err := yaml.NewDecoder(reader).Decode(&res); err != nil {
return err
}
return unmarshal(unmarshaler, cleanupMapValue(res), v)
}
// yamlUnmarshal YAML to map[string]interface{} instead of map[interface{}]interface{}.
func yamlUnmarshal(in []byte, out interface{}) error {
var res interface{}
if err := yaml.Unmarshal(in, &res); err != nil {
return err
}
*out.(*interface{}) = cleanupMapValue(res)
return nil
}

View File

@@ -926,14 +926,17 @@ func TestUnmarshalYamlBytesError(t *testing.T) {
}
func TestUnmarshalYamlReaderError(t *testing.T) {
payload := `abcd: cdef`
reader := strings.NewReader(payload)
var v struct {
Any string
}
reader := strings.NewReader(`abcd: cdef`)
err := UnmarshalYamlReader(reader, &v)
assert.NotNil(t, err)
reader = strings.NewReader("chenquan")
err = UnmarshalYamlReader(reader, &v)
assert.ErrorIs(t, err, ErrUnsupportedType)
}
func TestUnmarshalYamlBadReader(t *testing.T) {
@@ -1011,6 +1014,6 @@ func TestUnmarshalYamlMapRune(t *testing.T) {
type badReader struct{}
func (b *badReader) Read(p []byte) (n int, err error) {
func (b *badReader) Read(_ []byte) (n int, err error) {
return 0, io.ErrLimitReached
}

View File

@@ -4,7 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/stringx"
)
func TestMaxInt(t *testing.T) {

View File

@@ -2,7 +2,7 @@ package metric
import (
prom "github.com/prometheus/client_golang/prometheus"
"github.com/tal-tech/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/proc"
)
type (

View File

@@ -2,7 +2,7 @@ package metric
import (
prom "github.com/prometheus/client_golang/prometheus"
"github.com/tal-tech/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/proc"
)
type (

View File

@@ -2,7 +2,7 @@ package metric
import (
prom "github.com/prometheus/client_golang/prometheus"
"github.com/tal-tech/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/proc"
)
type (

View File

@@ -3,13 +3,11 @@ package mr
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/tal-tech/go-zero/core/errorx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/lang"
)
const (
@@ -25,12 +23,12 @@ var (
)
type (
// ForEachFunc is used to do element processing, but no output.
ForEachFunc func(item interface{})
// GenerateFunc is used to let callers send elements into source.
GenerateFunc func(source chan<- interface{})
// MapFunc is used to do element processing and write the output to writer.
MapFunc func(item interface{}, writer Writer)
// VoidMapFunc is used to do element processing, but no output.
VoidMapFunc func(item interface{})
// MapperFunc is used to do element processing and write the output to writer,
// use cancel func to cancel the processing.
MapperFunc func(item interface{}, writer Writer, cancel func(error))
@@ -43,6 +41,16 @@ type (
// Option defines the method to customize the mapreduce.
Option func(opts *mapReduceOptions)
mapperContext struct {
ctx context.Context
mapper MapFunc
source <-chan interface{}
panicChan *onceChan
collector chan<- interface{}
doneChan <-chan lang.PlaceholderType
workers int
}
mapReduceOptions struct {
ctx context.Context
workers int
@@ -70,7 +78,6 @@ func Finish(fns ...func() error) error {
cancel(err)
}
}, func(pipe <-chan interface{}, cancel func(error)) {
drain(pipe)
}, WithWorkers(len(fns)))
}
@@ -80,7 +87,7 @@ func FinishVoid(fns ...func()) {
return
}
MapVoid(func(source chan<- interface{}) {
ForEach(func(source chan<- interface{}) {
for _, fn := range fns {
source <- fn
}
@@ -90,45 +97,78 @@ func FinishVoid(fns ...func()) {
}, WithWorkers(len(fns)))
}
// Map maps all elements generated from given generate func, and returns an output channel.
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
// ForEach maps all elements from given generate but no output.
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
options := buildOptions(opts...)
source := buildSource(generate)
panicChan := &onceChan{channel: make(chan interface{})}
source := buildSource(generate, panicChan)
collector := make(chan interface{}, options.workers)
done := syncx.NewDoneChan()
done := make(chan lang.PlaceholderType)
go executeMappers(options.ctx, mapper, source, collector, done.Done(), options.workers)
go executeMappers(mapperContext{
ctx: options.ctx,
mapper: func(item interface{}, writer Writer) {
mapper(item)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
return collector
for {
select {
case v := <-panicChan.channel:
panic(v)
case _, ok := <-collector:
if !ok {
return
}
}
}
}
// MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer.
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
source := buildSource(generate)
return MapReduceWithSource(source, mapper, reducer, opts...)
panicChan := &onceChan{channel: make(chan interface{})}
source := buildSource(generate, panicChan)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
panicChan := &onceChan{channel: make(chan interface{})}
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
reducer ReducerFunc, opts ...Option) (interface{}, error) {
options := buildOptions(opts...)
// output is used to write the final result
output := make(chan interface{})
defer func() {
// reducer can only write once, if more, panic
for range output {
panic("more than one element written in reducer")
}
}()
// collector is used to collect data from mapper, and consume in reducer
collector := make(chan interface{}, options.workers)
done := syncx.NewDoneChan()
writer := newGuardedWriter(options.ctx, output, done.Done())
// if done is closed, all mappers and reducer should stop processing
done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once
// use atomic.Value to avoid data race
var retErr errorx.AtomicError
finish := func() {
closeOnce.Do(func() {
done.Close()
close(done)
close(output)
})
}
@@ -146,28 +186,41 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
go func() {
defer func() {
drain(collector)
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
panicChan.write(r)
}
finish()
}()
reducer(collector, writer, cancel)
}()
go executeMappers(options.ctx, func(item interface{}, w Writer) {
mapper(item, w, cancel)
}, source, collector, done.Done(), options.workers)
go executeMappers(mapperContext{
ctx: options.ctx,
mapper: func(item interface{}, w Writer) {
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
value, ok := <-output
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
select {
case <-options.ctx.Done():
cancel(context.DeadlineExceeded)
return nil, context.DeadlineExceeded
case v := <-panicChan.channel:
panic(v)
case v, ok := <-output:
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return v, nil
} else {
return nil, ErrReduceNoOutput
}
}
}
@@ -176,18 +229,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
reducer(input, cancel)
// We need to write a placeholder to let MapReduce to continue on reducer done,
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
writer.Write(lang.Placeholder)
}, opts...)
return err
}
if errors.Is(err, ErrReduceNoOutput) {
return nil
}
// MapVoid maps all elements from given generate but no output.
func MapVoid(generate GenerateFunc, mapper VoidMapFunc, opts ...Option) {
drain(Map(generate, func(item interface{}, writer Writer) {
mapper(item)
}, opts...))
return err
}
// WithContext customizes a mapreduce processing accepts a given ctx.
@@ -217,12 +264,18 @@ func buildOptions(opts ...Option) *mapReduceOptions {
return options
}
func buildSource(generate GenerateFunc) chan interface{} {
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
source := make(chan interface{})
threading.GoSafe(func() {
defer close(source)
go func() {
defer func() {
if r := recover(); r != nil {
panicChan.write(r)
}
close(source)
}()
generate(source)
})
}()
return source
}
@@ -234,39 +287,43 @@ func drain(channel <-chan interface{}) {
}
}
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
func executeMappers(mCtx mapperContext) {
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(collector)
close(mCtx.collector)
drain(mCtx.source)
}()
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(ctx, collector, done)
for {
var failed int32
pool := make(chan lang.PlaceholderType, mCtx.workers)
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
for atomic.LoadInt32(&failed) == 0 {
select {
case <-ctx.Done():
case <-mCtx.ctx.Done():
return
case <-done:
case <-mCtx.doneChan:
return
case pool <- lang.Placeholder:
item, ok := <-input
item, ok := <-mCtx.source
if !ok {
<-pool
return
}
wg.Add(1)
// better to safely run caller defined method
threading.GoSafe(func() {
go func() {
defer func() {
if r := recover(); r != nil {
atomic.AddInt32(&failed, 1)
mCtx.panicChan.write(r)
}
wg.Done()
<-pool
}()
mapper(item, writer)
})
mCtx.mapper(item, writer)
}()
}
}
}
@@ -312,3 +369,16 @@ func (gw guardedWriter) Write(v interface{}) {
gw.channel <- v
}
}
type onceChan struct {
channel chan interface{}
wrote int32
}
func (oc *onceChan) write(val interface{}) {
if atomic.AddInt32(&oc.wrote, 1) > 1 {
return
}
oc.channel <- val
}

View File

@@ -0,0 +1,78 @@
//go:build go1.18
// +build go1.18
package mr
import (
"fmt"
"math/rand"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
func FuzzMapReduce(f *testing.F) {
rand.Seed(time.Now().UnixNano())
f.Add(uint(10), uint(runtime.NumCPU()))
f.Fuzz(func(t *testing.T, num uint, workers uint) {
n := int64(num)%5000 + 5000
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
})
}

View File

@@ -0,0 +1,107 @@
//go:build fuzz
// +build fuzz
package mr
import (
"fmt"
"math/rand"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
// If Fuzz stuck, we don't know why, because it only returns hung or unexpected,
// so we need to simulate the fuzz test in test mode.
func TestMapReduceRandom(t *testing.T) {
rand.Seed(time.Now().UnixNano())
const (
times = 10000
nRange = 500
mega = 1024 * 1024
)
bar := pb.New(times).Start()
runner := threading.NewTaskRunner(runtime.NumCPU())
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
runner.Schedule(func() {
start := time.Now()
defer func() {
if time.Since(start) > time.Minute {
t.Fatal("timeout")
}
wg.Done()
}()
t.Run(strconv.Itoa(i), func(t *testing.T) {
n := rand.Int63n(nRange)%nRange + nRange
workers := rand.Int()%50 + runtime.NumCPU()/2
genPanic := rand.Intn(100) == 0
mapperPanic := rand.Intn(100) == 0
reducerPanic := rand.Intn(100) == 0
genIdx := rand.Int63n(n)
mapperIdx := rand.Int63n(n)
reducerIdx := rand.Int63n(n)
squareSum := (n - 1) * n * (2*n - 1) / 6
fn := func() (interface{}, error) {
return MapReduce(func(source chan<- interface{}) {
for i := int64(0); i < n; i++ {
source <- i
if genPanic && i == genIdx {
panic("foo")
}
}
}, func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int64)
if mapperPanic && v == mapperIdx {
panic("bar")
}
writer.Write(v * v)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var idx int64
var total int64
for v := range pipe {
if reducerPanic && idx == reducerIdx {
panic("baz")
}
total += v.(int64)
idx++
}
writer.Write(total)
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
}
if genPanic || mapperPanic || reducerPanic {
var buf strings.Builder
buf.WriteString(fmt.Sprintf("n: %d", n))
buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic))
buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic))
buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic))
buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx))
buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx))
buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx))
assert.Panicsf(t, func() { fn() }, buf.String())
} else {
val, err := fn()
assert.Nil(t, err)
assert.Equal(t, squareSum, val.(int64))
}
bar.Increment()
})
})
}
wg.Wait()
bar.Finish()
}

View File

@@ -11,8 +11,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx"
"go.uber.org/goleak"
)
var errDummy = errors.New("dummy")
@@ -22,6 +21,8 @@ func init() {
}
func TestFinish(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32
err := Finish(func() error {
atomic.AddUint32(&total, 2)
@@ -39,14 +40,20 @@ func TestFinish(t *testing.T) {
}
func TestFinishNone(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Nil(t, Finish())
}
func TestFinishVoidNone(t *testing.T) {
defer goleak.VerifyNone(t)
FinishVoid()
}
func TestFinishErr(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32
err := Finish(func() error {
atomic.AddUint32(&total, 2)
@@ -63,6 +70,8 @@ func TestFinishErr(t *testing.T) {
}
func TestFinishVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var total uint32
FinishVoid(func() {
atomic.AddUint32(&total, 2)
@@ -75,70 +84,107 @@ func TestFinishVoid(t *testing.T) {
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
}
func TestMap(t *testing.T) {
tests := []struct {
mapper MapFunc
expect int
}{
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
writer.Write(v * v)
},
expect: 30,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
return
}
writer.Write(v * v)
},
expect: 10,
},
{
mapper: func(item interface{}, writer Writer) {
v := item.(int)
if v%2 == 0 {
panic(v)
}
writer.Write(v * v)
},
expect: 10,
},
}
func TestForEach(t *testing.T) {
const tasks = 1000
for _, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) {
channel := Map(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
t.Run("all", func(t *testing.T) {
defer goleak.VerifyNone(t)
var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
}, WithWorkers(-1))
assert.Equal(t, tasks, int(count))
})
t.Run("odd", func(t *testing.T) {
defer goleak.VerifyNone(t)
var count uint32
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
if item.(int)%2 == 0 {
atomic.AddUint32(&count, 1)
}
})
assert.Equal(t, tasks/2, int(count))
})
t.Run("all", func(t *testing.T) {
defer goleak.VerifyNone(t)
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, test.mapper, WithWorkers(-1))
var result int
for v := range channel {
result += v.(int)
}
assert.Equal(t, test.expect, result)
}, func(item interface{}) {
panic("foo")
})
})
}
})
}
func TestGeneratePanic(t *testing.T) {
defer goleak.VerifyNone(t)
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- interface{}) {
panic("foo")
}, func(item interface{}) {
})
})
})
}
func TestMapperPanic(t *testing.T) {
defer goleak.VerifyNone(t)
const tasks = 1000
var run int32
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
atomic.AddInt32(&run, 1)
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
})
})
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
})
}
func TestMapReduce(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct {
name string
mapper MapperFunc
reducer ReducerFunc
expectErr error
expectValue interface{}
}{
{
name: "simple",
expectErr: nil,
expectValue: 30,
},
{
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -149,6 +195,7 @@ func TestMapReduce(t *testing.T) {
expectErr: errDummy,
},
{
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -160,6 +207,7 @@ func TestMapReduce(t *testing.T) {
expectValue: nil,
},
{
name: "cancel with more",
reducer: func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
@@ -174,36 +222,74 @@ func TestMapReduce(t *testing.T) {
},
}
for _, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
t.Run("MapReduce", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
writer.Write(result)
}
}
value, err := MapReduce(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
}
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
value, err := MapReduce(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}, test.mapper, test.reducer, WithWorkers(runtime.NumCPU()))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
})
t.Run("MapReduce", func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mapper == nil {
test.mapper = func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
writer.Write(v * v)
}
}
if test.reducer == nil {
test.reducer = func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
var result int
for item := range pipe {
result += item.(int)
}
writer.Write(result)
}
}
source := make(chan interface{})
go func() {
for i := 1; i < 5; i++ {
source <- i
}
close(source)
}()
value, err := MapReduceChan(source, test.mapper, test.reducer, WithWorkers(-1))
assert.Equal(t, test.expectErr, err)
assert.Equal(t, test.expectValue, value)
})
}
})
}
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
MapReduce(func(source chan<- interface{}) {
for i := 0; i < 10; i++ {
@@ -220,18 +306,23 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
}
func TestMapReduceVoid(t *testing.T) {
defer goleak.VerifyNone(t)
var value uint32
tests := []struct {
name string
mapper MapperFunc
reducer VoidReducerFunc
expectValue uint32
expectErr error
}{
{
name: "simple",
expectValue: 30,
expectErr: nil,
},
{
name: "cancel with error",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -242,6 +333,7 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: errDummy,
},
{
name: "cancel with nil",
mapper: func(item interface{}, writer Writer, cancel func(error)) {
v := item.(int)
if v%3 == 0 {
@@ -252,6 +344,7 @@ func TestMapReduceVoid(t *testing.T) {
expectErr: ErrCancelWithNil,
},
{
name: "cancel with more",
reducer: func(pipe <-chan interface{}, cancel func(error)) {
for item := range pipe {
result := atomic.AddUint32(&value, uint32(item.(int)))
@@ -265,7 +358,7 @@ func TestMapReduceVoid(t *testing.T) {
}
for _, test := range tests {
t.Run(stringx.Rand(), func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
atomic.StoreUint32(&value, 0)
if test.mapper == nil {
@@ -296,6 +389,8 @@ func TestMapReduceVoid(t *testing.T) {
}
func TestMapReduceVoidWithDelay(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int
err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0
@@ -318,38 +413,64 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
assert.Equal(t, 0, result[1])
}
func TestMapVoid(t *testing.T) {
const tasks = 1000
var count uint32
MapVoid(func(source chan<- interface{}) {
for i := 0; i < tasks; i++ {
source <- i
}
}, func(item interface{}) {
atomic.AddUint32(&count, 1)
})
func TestMapReducePanic(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Equal(t, tasks, int(count))
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("panic")
}
})
})
}
func TestMapReducePanic(t *testing.T) {
v, err := MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("panic")
}
func TestMapReducePanicOnce(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
for i := 0; i < 100; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
if i == 0 {
panic("foo")
}
writer.Write(i)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
for range pipe {
panic("bar")
}
})
})
}
func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
defer goleak.VerifyNone(t)
assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- interface{}) {
source <- 0
source <- 1
}, func(item interface{}, writer Writer, cancel func(error)) {
panic("foo")
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
panic("bar")
})
})
assert.Nil(t, v)
assert.NotNil(t, err)
assert.Equal(t, "panic", err.Error())
}
func TestMapReduceVoidCancel(t *testing.T) {
defer goleak.VerifyNone(t)
var result []int
err := MapReduceVoid(func(source chan<- interface{}) {
source <- 0
@@ -371,13 +492,15 @@ func TestMapReduceVoidCancel(t *testing.T) {
}
func TestMapReduceVoidCancelWithRemains(t *testing.T) {
var done syncx.AtomicBool
defer goleak.VerifyNone(t)
var done int32
var result []int
err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
done.Set(true)
atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
if i == defaultWorkers/2 {
@@ -392,10 +515,12 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
})
assert.NotNil(t, err)
assert.Equal(t, "anything", err.Error())
assert.True(t, done.True())
assert.Equal(t, int32(1), done)
}
func TestMapReduceWithoutReducerWrite(t *testing.T) {
defer goleak.VerifyNone(t)
uids := []int{1, 2, 3}
res, err := MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
@@ -412,33 +537,54 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
}
func TestMapReduceVoidPanicInReducer(t *testing.T) {
defer goleak.VerifyNone(t)
const message = "foo"
var done syncx.AtomicBool
err := MapReduceVoid(func(source chan<- interface{}) {
assert.Panics(t, func() {
var done int32
_ = MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, cancel func(error)) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, cancel func(error)) {
panic(message)
}, WithWorkers(1))
})
}
func TestForEachWithContext(t *testing.T) {
defer goleak.VerifyNone(t)
var done int32
ctx, cancel := context.WithCancel(context.Background())
ForEach(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
done.Set(true)
}, func(item interface{}, writer Writer, cancel func(error)) {
atomic.AddInt32(&done, 1)
}, func(item interface{}) {
i := item.(int)
writer.Write(i)
}, func(pipe <-chan interface{}, cancel func(error)) {
panic(message)
}, WithWorkers(1))
assert.NotNil(t, err)
assert.Equal(t, message, err.Error())
assert.True(t, done.True())
if i == defaultWorkers/2 {
cancel()
}
}, WithContext(ctx))
}
func TestMapReduceWithContext(t *testing.T) {
var done syncx.AtomicBool
defer goleak.VerifyNone(t)
var done int32
var result []int
ctx, cancel := context.WithCancel(context.Background())
err := MapReduceVoid(func(source chan<- interface{}) {
for i := 0; i < defaultWorkers*2; i++ {
source <- i
}
done.Set(true)
atomic.AddInt32(&done, 1)
}, func(item interface{}, writer Writer, c func(error)) {
i := item.(int)
if i == defaultWorkers/2 {
@@ -452,7 +598,7 @@ func TestMapReduceWithContext(t *testing.T) {
}
}, WithContext(ctx))
assert.NotNil(t, err)
assert.Equal(t, ErrReduceNoOutput, err)
assert.Equal(t, context.DeadlineExceeded, err)
}
func BenchmarkMapReduce(b *testing.B) {

89
core/mr/readme-cn.md Normal file
View File

@@ -0,0 +1,89 @@
# mapreduce
[English](readme.md) | 简体中文
## 为什么需要 MapReduce
在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。
比如要查询商品详情:
1. 商品服务-查询商品属性
2. 库存服务-查询库存属性
3. 价格服务-查询价格属性
4. 营销服务-查询营销属性
如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。
简单的场景下使用 `WaitGroup` 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 `WaitGroup` 就有点力不从心了go 的官方库中并没有这种工具java 中提供了 CompleteFuture我们依据 MapReduce 架构思想实现了进程内的数据批处理 MapReduce 并发工具类。
## 设计思路
我们尝试把自己代入到作者的角色梳理一下并发工具可能的业务场景:
1. 查询商品详情:支持并发调用多个服务来组合产品属性,支持调用错误可以立即结束。
2. 商品详情页自动推荐用户卡券:支持并发校验卡券,校验失败自动剔除,返回全部卡券。
以上实际都是在进行对输入数据进行处理最后输出清洗后的数据,针对数据处理有个非常经典的异步模式:生产者消费者模式。于是我们可以抽象一下数据批处理的生命周期,大致可以分为三个阶段:
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-serial-cn.png" width="500">
1. 数据生产 generate
2. 数据加工 mapper
3. 数据聚合 reducer
其中数据生产是不可或缺的阶段,数据加工、数据聚合是可选阶段,数据生产与加工支持并发调用,数据聚合基本属于纯内存操作单协程即可。
再来思考一下不同阶段之间数据应该如何流转,既然不同阶段的数据处理都是由不同 goroutine 执行的,那么很自然的可以考虑采用 channel 来实现 goroutine 之间的通信。
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-cn.png" width="500">
如何实现随时终止流程呢?
`goroutine` 中监听一个全局的结束 `channel` 和调用方提供的 `ctx` 就行。
## 简单示例
并行求平方和(不要嫌弃示例简单,只是模拟并发)
```go
package main
import (
"fmt"
"log"
"github.com/zeromicro/go-zero/core/mr"
)
func main() {
val, err := mr.MapReduce(func(source chan<- interface{}) {
// generator
for i := 0; i < 10; i++ {
source <- i
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
// mapper
i := item.(int)
writer.Write(i * i)
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
// reducer
var sum int
for i := range pipe {
sum += i.(int)
}
writer.Write(sum)
})
if err != nil {
log.Fatal(err)
}
fmt.Println("result:", val)
}
```
更多示例:[https://github.com/zeromicro/zero-examples/tree/main/mapreduce](https://github.com/zeromicro/zero-examples/tree/main/mapreduce)
## 欢迎 star
如果你正在使用或者觉得这个项目对你有帮助,请 **star** 支持,感谢!

90
core/mr/readme.md Normal file
View File

@@ -0,0 +1,90 @@
<img align="right" width="150px" src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/go-zero.png">
# mapreduce
English | [简体中文](readme-cn.md)
## Why MapReduce is needed
In practical business scenarios we often need to get the corresponding properties from different rpc services to assemble complex objects.
For example, to query product details.
1. product service - query product attributes
2. inventory service - query inventory properties
3. price service - query price attributes
4. marketing service - query marketing properties
If it is a serial call, the response time will increase linearly with the number of rpc calls, so we will generally change serial to parallel to optimize response time.
Simple scenarios using `WaitGroup` can also meet the needs, but what if we need to check the data returned by the rpc call, data processing, data aggregation? The official go library does not have such a tool (CompleteFuture is provided in java), so we implemented an in-process data batching MapReduce concurrent tool based on the MapReduce architecture.
## Design ideas
Let's try to put ourselves in the author's shoes and sort out the possible business scenarios for the concurrency tool:
1. querying product details: supporting concurrent calls to multiple services to combine product attributes, and supporting call errors that can be ended immediately.
2. automatic recommendation of user card coupons on product details page: support concurrently verifying card coupons, automatically rejecting them if they fail, and returning all of them.
The above is actually processing the input data and finally outputting the cleaned data. There is a very classic asynchronous pattern for data processing: the producer-consumer pattern. So we can abstract the life cycle of data batch processing, which can be roughly divided into three phases.
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-serial-en.png" width="500">
1. data production generate
2. data processing mapper
3. data aggregation reducer
Data producing is an indispensable stage, data processing and data aggregation are optional stages, data producing and processing support concurrent calls, data aggregation is basically a pure memory operation, so a single concurrent process can do it.
Since different stages of data processing are performed by different goroutines, it is natural to consider the use of channel to achieve communication between goroutines.
<img src="https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mapreduce-en.png" width="500">
How can I terminate the process at any time?
It's simple, just receive from a channel or the given context in the goroutine.
## A simple example
Calculate the sum of squares, simulating the concurrency.
```go
package main
import (
"fmt"
"log"
"github.com/zeromicro/go-zero/core/mr"
)
func main() {
val, err := mr.MapReduce(func(source chan<- interface{}) {
// generator
for i := 0; i < 10; i++ {
source <- i
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
// mapper
i := item.(int)
writer.Write(i * i)
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
// reducer
var sum int
for i := range pipe {
sum += i.(int)
}
writer.Write(sum)
})
if err != nil {
log.Fatal(err)
}
fmt.Println("result:", val)
}
```
More examples: [https://github.com/zeromicro/zero-examples/tree/main/mapreduce](https://github.com/zeromicro/zero-examples/tree/main/mapreduce)
## Give a Star! ⭐
If you like or are using this project to learn or start your solution, please give it a star. Thanks!

View File

@@ -11,7 +11,7 @@ import (
"syscall"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/logx"
)
const (

Some files were not shown because too many files have changed in this diff Show More