Compare commits

..

200 Commits

Author SHA1 Message Date
Kevin Wan
9ccb997ed8 refactor mapping (#782) 2021-06-23 14:57:37 +08:00
skykiss
01c92a6bc5 fix: Fix problems with non support for multidimensional arrays and basic type pointer arrays (#778)
Co-authored-by: shaoqian <shaoqian.zhang@appshahe.com>
2021-06-23 10:58:01 +08:00
lucaq
c9a2a60e28 Add Sinter,Sinterstore & Modify TestRedis_Set (#779)
* Add Sinter,Sinterstore; Modify TestRedis_Set

* Update redis_test.go

fix test failure

Co-authored-by: lucq <lucq@toopsoon.com>
Co-authored-by: Kevin Wan <wanjunfeng@gmail.com>
2021-06-23 10:46:16 +08:00
Kevin Wan
b0739d63c0 update readme images (#776) 2021-06-21 16:45:44 +08:00
Kevin Wan
c22f84cb5f update image rendering in readme (#775) 2021-06-21 16:34:14 +08:00
Kevin Wan
60450bab02 disable load & stat logs for goctl (#773) 2021-06-21 14:25:33 +08:00
Kevin Wan
3e8cec5c78 upgrade grpc & etcd dependencies (#771) 2021-06-21 09:05:20 +08:00
Kevin Wan
74ee163761 fix bug that etcd stream cancelled without re-watch (#770) 2021-06-17 18:46:16 +08:00
anqiansong
ea4f680052 Fix issue #747 (#765)
Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-06-15 18:54:41 +08:00
heyanfu
58cdba2c5d remove useless annotation (#761) 2021-06-14 16:00:47 +08:00
Kevin Wan
a2fbc14c70 add roadmap (#764) 2021-06-13 11:59:30 +08:00
Kevin Wan
158df8c270 fix broken link (#763) 2021-06-13 11:35:19 +08:00
Kevin Wan
30ec236a87 add contributing guid (#762) 2021-06-13 11:33:29 +08:00
Kevin Wan
ac3653b3f9 add code of conduct (#760) 2021-06-12 22:17:00 +08:00
Kevin Wan
8520db4fd9 refactor fx (#759)
* refactor fx

* refactor fx, format code
2021-06-10 19:57:36 +08:00
Chen Quan
14141fed62 Add some stream features (#712)
* Add some stream features

* Update empty

* Fix initialization loop

* Delete ForeachOrdered && Fix FindFirst

* Add test case && Delete redundant code

* Update test case

* Delete SplitSteam

* Delete redundant code
2021-06-10 18:20:40 +08:00
Kevin Wan
5d86cc2f20 add go-zero users (#756) 2021-06-07 14:08:54 +08:00
Kevin Wan
8a6e4b7580 add go-zero users (#751) 2021-06-03 15:07:21 +08:00
anqiansong
453f949638 replace cache key with colon (#746)
Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-06-02 10:37:49 +08:00
Kevin Wan
75a330184d add go-zero users (#739) 2021-05-29 23:58:43 +08:00
kingxt
546fcd8bab fix #736 (#738)
* optimize performance

* rename

* rename

* revert
2021-05-29 23:01:02 +08:00
Xavier Cheng
3022f93b6d Fix a typo (#729)
alread -> already
2021-05-28 23:39:07 +08:00
Kevin Wan
8ffc392c66 add go-zero users, update slack invite link (#728) 2021-05-28 14:32:47 +08:00
Kevin Wan
ae7d85dadf add go-zero users (#726) 2021-05-28 10:50:33 +08:00
Kevin Wan
e89268ac37 add go-zero users. (#723)
* add go-zero users

* add go-zero users
2021-05-27 22:53:55 +08:00
Kevin Wan
aaa3623404 optimize nested conditional (#709) 2021-05-22 23:18:38 +08:00
heyanfu
8998f16054 optimize nested conditional (#708)
Co-authored-by: heyanfu <heyanfu@kingsoft.com>
2021-05-22 22:56:06 +08:00
anqiansong
94417be018 Add document & comment for spec (#703)
* Add document & comment for spec

* remove duplicate field

* use alias
2021-05-21 10:40:59 +08:00
Kevin Wan
f300408fc0 fix golint issues, and optimize code (#705) 2021-05-21 10:38:38 +08:00
Kevin Wan
aaa39e17a3 print entire sql statements in logx if necessary (#704) 2021-05-20 16:14:44 +08:00
Bo-Yi Wu
73906f996d chore(format): change by gofumpt tool (#697)
Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
2021-05-18 14:43:09 +08:00
Kevin Wan
73417f54db update goctl version to 1.1.8 (#696) 2021-05-17 15:13:48 +08:00
Kevin Wan
491213afb8 fix #683 (#690)
* fix #683

* fix errors
2021-05-15 15:37:24 +08:00
Julian-Chu
edf743cd72 fix invalid link (#689) 2021-05-15 15:37:10 +08:00
Kevin Wan
78a88be787 add go-zero users (#688) 2021-05-14 22:52:19 +08:00
anqiansong
9f6a574f97 resolve #610 (#684) 2021-05-13 18:42:05 +08:00
anqiansong
ea01cc78f0 Optimize model nl (#686) 2021-05-12 12:28:23 +08:00
Kevin Wan
a87978568a fix #676 (#682) 2021-05-10 23:10:57 +08:00
Kevin Wan
14cecb9b31 update readme for documents links (#681) 2021-05-10 22:22:32 +08:00
_ksco
0ce54100a4 fix zh_cn document url (#678) 2021-05-10 22:18:33 +08:00
anqiansong
d28ac35ff7 fix issue: https://github.com/zeromicro/goctl-swagger/issues/6 (#680) 2021-05-10 19:57:12 +08:00
heyanfu
a5962f677f fix some typo (#677) 2021-05-10 00:09:00 +08:00
Kevin Wan
8478474f7f update readme (#673) 2021-05-08 21:55:14 +08:00
anqiansong
df5ae9507f replace antlr module (#672)
* replace antlr module

* refactor version of antlr
2021-05-08 21:35:27 +08:00
noel
faf4d7e3bb modify the order of PrometheusHandler (#670)
* modify the order of PrometheusHandler

* modify the order of PrometheusHandler
2021-05-08 17:11:16 +08:00
anqiansong
f64fe5eb5e fix antlr mod (#669) 2021-05-08 00:03:01 +08:00
heyanfu
97d889103a fix some typo (#667) 2021-05-04 21:33:08 +08:00
Kevin Wan
9a44310d00 update wechat qrcode (#665) 2021-05-02 15:06:16 +08:00
Kevin Wan
06eeef2cf3 disable prometheus if not configured (#663) 2021-04-30 15:09:49 +08:00
Kevin Wan
9adc7d4cb9 fix comment function names (#649) 2021-04-23 11:56:41 +08:00
Kevin Wan
006f78c3d5 add go-zero users (#643) 2021-04-21 10:24:15 +08:00
Kevin Wan
64a8e65f4a update readme (#640) 2021-04-20 23:57:57 +08:00
anqiansong
8fd1e76d29 update readme (#638) 2021-04-19 14:37:47 +08:00
heyanfu
0466af5e49 optimize code (#637) 2021-04-18 22:49:03 +08:00
heyanfu
7405d7f506 spelling mistakes (#634) 2021-04-17 20:15:19 +08:00
Bo-Yi Wu
afd9ff889e chore: update code format. (#628) 2021-04-15 19:49:17 +08:00
另维64
7e087de6e6 doc: fix spell mistake (#627) 2021-04-14 17:58:27 +08:00
Kevin Wan
5aded99df5 update go-zero users (#623) 2021-04-13 14:38:40 +08:00
Kevin Wan
08fb980ad2 add syncx.Guard func (#620) 2021-04-13 00:04:19 +08:00
Kevin Wan
b94d7aa532 update readme (#617) 2021-04-10 19:19:05 +08:00
Kevin Wan
ee630b8b57 add code coverage (#615)
* add code coverage

* simplify redis code
2021-04-09 22:40:43 +08:00
Kevin Wan
bd82b7d8de add FAQs in readme (#612) 2021-04-09 18:59:17 +08:00
Kevin Wan
3d729c77a6 update go-zero users (#611) 2021-04-09 14:16:31 +08:00
Kevin Wan
e944b59bb3 update go-zero users (#609)
* add go-zero users registry notes

* update go-zero users

* fix typo
2021-04-09 10:43:47 +08:00
Kevin Wan
54b5e3f4b2 add go-zero users registry notes (#608) 2021-04-08 22:44:41 +08:00
Kevin Wan
b913229028 add go-zero users (#607) 2021-04-08 22:30:45 +08:00
Kevin Wan
9963ffb1c1 simplify redis tls implementation (#606) 2021-04-08 18:19:36 +08:00
r00mz
8cb6490724 redis增加tls支持 (#595)
* redis连接增加支持tls选项

* 优化redis tls config 写法

* redis增加tls支持

* 增加redis tls测试用例,但redis tls local server不支持,测试用例全部NotNil

Co-authored-by: liuyi <liuyi@fangyb.com>
Co-authored-by: yi.liu <yi.liu@xshoppy.com>
2021-04-07 20:44:16 +08:00
Kevin Wan
05e37ee20f refactor - remove ShrinkDeadline, it's the same as context.WithTimeout (#599) 2021-04-05 22:59:24 +08:00
zjbztianya
d88da4cc88 Replace contextx.ShrinkDeadline with context.WithTimeout (#598) 2021-04-05 21:20:35 +08:00
Oraoto
425430f67c Simplify contextx.ShrinkDeadline (#596) 2021-04-03 21:25:32 +08:00
Zcc、
4e0d91f6c0 fix (#592)
Co-authored-by: zhoudeyu <zhoudeyu@xiaoheiban.cn>
2021-04-01 18:42:50 +08:00
Kevin Wan
8584351b6d update regression test comment (#590) 2021-03-30 21:23:07 +08:00
Kevin Wan
b19c5223a9 update regression test comment (#589) 2021-03-30 20:53:35 +08:00
bittoy
99a2d95433 remove rt mode log (#587) 2021-03-30 20:45:55 +08:00
Ted Chen
9db222bf5b fix a simple typo (#588) 2021-03-29 23:35:49 +08:00
Kevin Wan
ac648d08cb fix typo (#586) 2021-03-28 22:10:07 +08:00
Kevin Wan
6df7fa619c fix typo (#585) 2021-03-28 21:20:04 +08:00
Kevin Wan
bbb4ce586f fix golint issues (#584) 2021-03-28 20:42:11 +08:00
anqiansong
888551627c optimize code (#579)
* optimize code

* optimize returns & unit test
2021-03-27 17:33:17 +08:00
Kevin Wan
bd623aaac3 support postgresql (#583)
support postgresql
2021-03-27 17:14:32 +08:00
Kevin Wan
9e6c2ba2c0 avoid goroutine leak after timeout (#575) 2021-03-21 16:54:34 +08:00
Kevin Wan
c0db8d017d gofmt logs (#574) 2021-03-20 16:40:09 +08:00
TonyWang
52b4f8ca91 add timezone and timeformat (#572)
* add timezone and timeformat

* rm time zone and keep time format

Co-authored-by: Tony Wang <tonywang.data@gmail.com>
2021-03-20 16:36:19 +08:00
Kevin Wan
4884a7b3c6 zrpc timeout & unit tests (#573)
* zrpc timeout & unit tests
2021-03-19 18:41:26 +08:00
Kevin Wan
3c6951577d make hijack more stable (#565) 2021-03-15 20:11:09 +08:00
Kevin Wan
fcd15c9b17 refactor, and add comments to describe graceful shutdown (#564) 2021-03-14 08:51:10 +08:00
Kevin Wan
155e6061cb fix golint issues (#561) 2021-03-12 23:08:04 +08:00
anqiansong
dda7666097 Feature mongo gen (#546)
* add feature: mongo code generation

* upgrade version

* update doc

* format code

* update update.tpl of mysql
2021-03-12 17:49:28 +08:00
hanhotfox
c954568b61 Hdel support for multiple key deletion (#542)
* Hdel support for multiple key deletion

* Hdel field -> fields

Co-authored-by: duanyan <duanyan@xiaoheiban.cn>
2021-03-12 17:47:21 +08:00
Kevin Wan
c2acc43a52 add important notes in readme (#560) 2021-03-12 16:48:25 +08:00
Kevin Wan
1a1a6f5239 add http hijack methods (#555) 2021-03-09 21:30:45 +08:00
anqiansong
60c7edf8f8 fix spelling (#551) 2021-03-08 18:23:12 +08:00
Kevin Wan
7ad86a52f3 update doc link (#552) 2021-03-08 17:56:03 +08:00
kingxt
1e4e5a02b2 rename (#543) 2021-03-04 17:13:07 +08:00
Kevin Wan
39540e21d2 fix golint issues (#540) 2021-03-03 17:16:09 +08:00
hexiaoen
b321622c95 暴露redis EvalSha 以及ScriptLoad接口 (#538)
Co-authored-by: shanehe <shanehe@zego.im>
2021-03-03 17:09:27 +08:00
kingxt
a25cba5380 fix collection breaker (#537)
* fix collection breaker

* optimized

* optimized

* optimized
2021-03-03 10:44:29 +08:00
Kevin Wan
f01472c9ea fix golint issues (#535) 2021-03-02 11:02:57 +08:00
Kevin Wan
af531cf264 fix golint issues (#533) 2021-03-02 00:11:18 +08:00
Kevin Wan
c4b2cddef7 fix golint issues (#532) 2021-03-02 00:04:12 +08:00
Kevin Wan
51de0d0620 fix golint issues in zrpc (#531) 2021-03-01 23:52:44 +08:00
anqiansong
dd393351cc patch 1.1.5 (#530) 2021-03-01 21:14:07 +08:00
Kevin Wan
655ae8034c fix golint issues in rest (#529) 2021-03-01 19:15:35 +08:00
anqiansong
d894b88c3e feature 1.1.5 (#411) 2021-03-01 17:29:07 +08:00
Kevin Wan
791e76bcf0 fix broken build (#528) 2021-02-28 23:53:58 +08:00
Kevin Wan
c566b5ff82 fix golint issues in core/stores (#527) 2021-02-28 23:02:49 +08:00
Kevin Wan
490241d639 fix golint issues in core/syncx (#526) 2021-02-28 16:16:22 +08:00
Kevin Wan
f02711a9cb golint core/discov (#525) 2021-02-27 23:56:18 +08:00
Kevin Wan
ad32f9de23 fix golint issues in core/threading (#524) 2021-02-26 16:27:04 +08:00
Kevin Wan
f309e9f80c fix golint issues in core/utils (#520)
* fix golint issues in core/utils

* fix golint issues in core/trace

* fix golint issues in core/trace
2021-02-26 16:20:47 +08:00
hao
2087ac1e89 修正http转发头字段值错误 (#521) 2021-02-26 16:17:30 +08:00
kingxt
e6ef1fca12 Code optimized (#523)
* optimized markdown generator

* optimized markdown generator

* optimized markdown generator

* add more comment

* add comment

* add comment

* add comments for rpc tool

* add comments for model tool

* add comments for model tool

* add comments for model tool

* add comments for config tool

* add comments for config tool

* add comments

* add comments

* add comments

* add comments

* add comment

* remove rpc main head info

* add comment

* optimized

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-02-26 16:11:47 +08:00
Kevin Wan
ef146cf5ba fix golint issues in core/timex (#517) 2021-02-24 16:27:11 +08:00
Kevin Wan
04b0f26182 fix golint issues in core/stringx (#516) 2021-02-24 16:09:07 +08:00
Kevin Wan
acdaee0fb6 fix golint issues in core/stat (#515)
* change to use ServiceGroup to make it more clear

* fix golint issues in core/stat
2021-02-24 15:13:56 +08:00
Kevin Wan
56ad4776d4 fix misspelling (#513) 2021-02-23 13:53:19 +08:00
Kevin Wan
904d168f18 fix golint issues in core/service (#512) 2021-02-22 22:43:24 +08:00
Kevin Wan
4bd4981bfb fix golint issues in core/search (#509) 2021-02-22 18:58:03 +08:00
Kevin Wan
90562df826 fix golint issues in core/rescue (#508) 2021-02-22 16:47:02 +08:00
Kevin Wan
497762ab47 fix golint issues in core/queue (#507) 2021-02-22 16:38:42 +08:00
Kevin Wan
6e4c98e52d fix golint issues in core/prometheus (#506) 2021-02-22 14:55:04 +08:00
Kevin Wan
b4bb5c0323 fix broken links in readme (#505) 2021-02-22 14:13:33 +08:00
Kevin Wan
a58fac9000 fix golint issues in core/prof (#503) 2021-02-22 10:20:54 +08:00
Kevin Wan
d84e3d4b53 fix golint issues in core/proc (#502) 2021-02-22 10:07:39 +08:00
Kevin Wan
221f923fae fix golint issues in core/netx (#501) 2021-02-22 09:56:56 +08:00
Kevin Wan
bbb9126302 fix golint issues in core/mr (#500) 2021-02-22 09:47:06 +08:00
Kevin Wan
e7c9ef16fe fix golint issues in core/metric (#499) 2021-02-21 21:18:07 +08:00
Kevin Wan
8872d7cbd3 fix golint issues in core/mathx (#498) 2021-02-21 20:47:01 +08:00
Kevin Wan
334ee4213f fix golint issues in core/mapping (#497) 2021-02-20 23:18:22 +08:00
Kevin Wan
226513ed60 fix golint issues in core/logx (#496) 2021-02-20 22:45:58 +08:00
Kevin Wan
dac00d10c1 fix golint issues in core/load (#495) 2021-02-20 22:02:09 +08:00
Kevin Wan
84d2b6f8f5 fix golint issues in core/limit (#494) 2021-02-20 21:55:54 +08:00
kingxt
f98c9246b2 Code optimized (#493) 2021-02-20 19:50:03 +08:00
Kevin Wan
059027bc9d fix golint issues in core/lang (#492) 2021-02-20 18:21:23 +08:00
Kevin Wan
af68caeaf6 fix golint issues in core/jsonx (#491) 2021-02-20 16:59:31 +08:00
Zcc、
fdeacfc89f add redis bitmap command (#490)
Co-authored-by: zhoudeyu <zhoudeyu@xiaoheiban.cn>
2021-02-20 16:26:49 +08:00
Kevin Wan
5b33dd59d9 fix golint issues in core/jsontype (#489) 2021-02-20 15:07:49 +08:00
Kevin Wan
1f92bfde6a fix golint issues in core/iox (#488) 2021-02-19 18:40:26 +08:00
Kevin Wan
0c094cb2d7 fix golint issues in core/hash (#487) 2021-02-19 18:14:34 +08:00
Kevin Wan
f238290dd3 fix golint issues in core/fx (#486) 2021-02-19 17:49:39 +08:00
Kevin Wan
c376ffc351 fix golint issues in core/filex (#485) 2021-02-19 14:30:38 +08:00
Kevin Wan
802549ac7c fix golint issues in core/executors (#484) 2021-02-19 12:03:05 +08:00
Zcc、
72580dee38 redis add bitcount (#483)
Co-authored-by: zhoudeyu <zhoudeyu@xiaoheiban.cn>
2021-02-19 11:41:01 +08:00
Kevin Wan
086113c843 prevent negative timeout settings (#482)
* prevent negative timeout settings

* fix misleading comment
2021-02-19 10:44:39 +08:00
HarryWang29
d239952d2d zrpc client support block (#412) 2021-02-19 10:24:03 +08:00
Kevin Wan
7472d1e70b fix golint issues in core/errorx (#480) 2021-02-19 10:08:38 +08:00
Kevin Wan
2446d8a668 fix golint issues in core/discov (#479) 2021-02-18 22:56:35 +08:00
Kevin Wan
f6894448bd fix golint issues in core/contextx (#477) 2021-02-18 18:00:20 +08:00
Kevin Wan
425be6b4a1 fix golint issues in core/conf (#476) 2021-02-18 15:56:19 +08:00
Kevin Wan
457048bfac fix golint issues in core/collection, refine cache interface (#475) 2021-02-18 15:49:56 +08:00
kingxt
f14ab70035 Code optimized (#474)
* optimized markdown generator

* optimized markdown generator

* optimized markdown generator

* optimized markdown generator
2021-02-18 15:08:20 +08:00
Kevin Wan
8f1c88e07d fix golint issues in core/codec (#473) 2021-02-18 14:11:09 +08:00
Kevin Wan
9602494454 fix issue #469 (#471) 2021-02-17 21:42:22 +08:00
Kevin Wan
38abfb80ed fix gocyclo warnings (#468) 2021-02-17 14:01:05 +08:00
Kevin Wan
87938bcc09 fix golint issues in core/cmdline (#467) 2021-02-17 11:08:30 +08:00
Kevin Wan
8ebf6750b9 fix golint issues in core/breaker (#466) 2021-02-17 10:45:55 +08:00
Kevin Wan
6f92daae12 fix golint issues in core/bloom (#465) 2021-02-17 09:58:35 +08:00
Kevin Wan
80e1c85b50 add more tests for service (#463) 2021-02-11 23:48:19 +08:00
Kevin Wan
395a1db22f add more tests for rest (#462) 2021-02-10 23:08:48 +08:00
bittoy
28009c4224 Update serviceconf.go (#460)
add regression environment config
2021-02-09 15:35:50 +08:00
Kevin Wan
211f3050e9 fix golint issues (#459) 2021-02-09 14:10:38 +08:00
Kevin Wan
03b5fd4a10 fix golint issues (#458) 2021-02-09 14:03:19 +08:00
Kevin Wan
5e969cbef0 fix golint issues, else blocks (#457) 2021-02-09 13:50:21 +08:00
Kevin Wan
42883d0899 fix golint issues, redis methods (#455) 2021-02-09 10:58:11 +08:00
Kevin Wan
06f6dc9937 fix golint issues, package comments (#454) 2021-02-08 22:31:52 +08:00
Kevin Wan
1789b12db2 move examples into zero-examples (#453)
* move examples to zero-examples

* tidy go.mod

* add examples refer in readme
2021-02-08 22:23:36 +08:00
Kevin Wan
c7f3e6119d remove images, use zero-doc instead (#452) 2021-02-08 21:57:40 +08:00
Kevin Wan
54414db91d fix golint issues, exported doc (#451) 2021-02-08 21:31:56 +08:00
Kevin Wan
9b0625bb83 fix golint issues (#450) 2021-02-08 17:08:40 +08:00
Kevin Wan
0dda05fd57 add api doc (#449) 2021-02-08 11:10:55 +08:00
Kevin Wan
5b79ba2618 add discov tests (#448) 2021-02-07 20:24:47 +08:00
Kevin Wan
22a1fa649e remove etcd facade, added for testing purpose (#447) 2021-02-07 19:07:15 +08:00
Kevin Wan
745e76c335 add more tests for stores (#446) 2021-02-07 17:22:47 +08:00
Kevin Wan
852891dbd8 add more tests for stores (#445) 2021-02-07 15:27:01 +08:00
Kevin Wan
316195e912 add more tests for mongoc (#443) 2021-02-07 14:41:00 +08:00
Kevin Wan
8e889d694d add more tests for sqlx (#442)
* add more tests for sqlx

* add more tests for sqlx
2021-02-07 11:54:41 +08:00
Kevin Wan
ec6132b754 add more tests for zrpc (#441) 2021-02-06 12:25:45 +08:00
Kevin Wan
c282bb1d86 add more tests for sqlx (#440) 2021-02-05 22:53:21 +08:00
Kevin Wan
d04b54243d add more tests for proc (#439) 2021-02-05 15:11:27 +08:00
Kevin Wan
b88ba14597 fixes issue #425 (#438) 2021-02-05 13:32:56 +08:00
理工男
7b3c3de35e ring struct add lock (#434)
Co-authored-by: liuhuan210 <liuhuan210@jd.com>
2021-02-03 21:41:10 +08:00
Kevin Wan
abab7c2852 Update readme.md 2021-02-03 15:43:35 +08:00
Kevin Wan
30f5ab0b99 update readme for broken links (#432) 2021-02-03 12:02:22 +08:00
foyon
8b273a075c Support redis command Rpop (#431)
* ss

* ss

* add go-zero:stores:redis-command:Rpop and redis_test

* Delete 1.go

* support redis command Rpop

Co-authored-by: fanhongyi <fanhongyi@tal.com>
2021-02-03 10:19:42 +08:00
Liang Zheng
76026fc211 fix readme.md error (#429)
Signed-off-by: Liang Zheng <microyahoo@163.com>
2021-02-03 10:18:28 +08:00
Hkesd
04284e31cd support hscan in redis (#428) 2021-02-02 17:02:18 +08:00
Kevin Wan
c3b9c3c5ab use english readme as default, because of github ranking (#427) 2021-02-02 16:58:45 +08:00
FengZhang
a8b550e7ef Modify the http content-length max range : 30MB --> 32MB (#424)
Because we are programmer :)
2021-01-30 18:49:33 +08:00
FengZhang
cbfbebed00 modify the maximum content-length to 30MB (#413) 2021-01-29 22:14:48 +08:00
kingxt
2b07f22672 optimize code (#417)
* optimize code

* optimize code

* optimize code

* optimize code
2021-01-26 17:37:22 +08:00
Kevin Wan
a784982030 support zunionstore in redis (#410) 2021-01-21 21:03:24 +08:00
Kevin Wan
ebec5aafab use env if necessary in loading config (#409) 2021-01-21 19:33:34 +08:00
Kevin Wan
572b32729f update goctl version to 1.1.3 (#402) 2021-01-18 16:34:00 +08:00
kingxt
43e712d86a fix type convert error (#395) 2021-01-16 18:24:11 +08:00
kingxt
4db20677f7 optimized (#392) 2021-01-15 11:36:37 +08:00
Kevin Wan
6887fb22de add more tests for codec (#391) 2021-01-14 23:39:44 +08:00
Kevin Wan
50fbdbcfd7 update readme (#390) 2021-01-14 22:26:31 +08:00
ALMAS
c77b8489d7 Update periodicalexecutor.go (#389) 2021-01-14 22:20:09 +08:00
Kevin Wan
eca4ed2cc0 format code (#386) 2021-01-14 13:24:24 +08:00
685 changed files with 9570 additions and 12535 deletions

102
CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,102 @@
# Contributing
Welcome to go-zero!
- [Before you get started](#before-you-get-started)
- [Code of Conduct](#code-of-conduct)
- [Community Expectations](#community-expectations)
- [Getting started](#getting-started)
- [Your First Contribution](#your-first-contribution)
- [Find something to work on](#find-something-to-work-on)
- [Find a good first topic](#find-a-good-first-topic)
- [Work on an Issue](#work-on-an-issue)
- [File an Issue](#file-an-issue)
- [Contributor Workflow](#contributor-workflow)
- [Creating Pull Requests](#creating-pull-requests)
- [Code Review](#code-review)
- [Testing](#testing)
# Before you get started
## Code of Conduct
Please make sure to read and observe our [Code of Conduct](/code-of-conduct.md).
## Community Expectations
go-zero is a community project driven by its community which strives to promote a healthy, friendly and productive environment.
go-zero is a web and rpc framework written in Go. It's born to ensure the stability of the busy sites with resilient design. Builtin goctl greatly improves the development productivity.
# Getting started
- Fork the repository on GitHub.
- Make your changes on your fork repository.
- Submit a PR.
# Your First Contribution
We will help you to contribute in different areas like filing issues, developing features, fixing critical bugs and
getting your work reviewed and merged.
If you have questions about the development process,
feel free to [file an issue](https://github.com/tal-tech/go-zero/issues/new/choose).
## Find something to work on
We are always in need of help, be it fixing documentation, reporting bugs or writing some code.
Look at places where you feel best coding practices aren't followed, code refactoring is needed or tests are missing.
Here is how you get started.
### Find a good first topic
[go-zero](https://github.com/tal-tech/go-zero) has beginner-friendly issues that provide a good first issue.
For example, [go-zero](https://github.com/tal-tech/go-zero) has
[help wanted](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22) and
[good first issue](https://github.com/tal-tech/go-zero/issues?q=is%3Aopen+is%3Aissue+label%3A%22good+first+issue%22)
labels for issues that should not need deep knowledge of the system.
We can help new contributors who wish to work on such issues.
Another good way to contribute is to find a documentation improvement, such as a missing/broken link.
Please see [Contributing](#contributing) below for the workflow.
#### Work on an issue
When you are willing to take on an issue, just reply on the issue. The maintainer will assign it to you.
### File an Issue
While we encourage everyone to contribute code, it is also appreciated when someone reports an issue.
Please follow the prompted submission guidelines while opening an issue.
# Contributor Workflow
Please do not ever hesitate to ask a question or send a pull request.
This is a rough outline of what a contributor's workflow looks like:
- Create a topic branch from where to base the contribution. This is usually master.
- Make commits of logical units.
- Push changes in a topic branch to a personal fork of the repository.
- Submit a pull request to [go-zero](https://github.com/tal-tech/go-zero).
## Creating Pull Requests
Pull requests are often called simply "PR".
go-zero generally follows the standard [github pull request](https://help.github.com/articles/about-pull-requests/) process.
To submit a proposed change, please develop the code/fix and add new test cases.
After that, run these local verifications before submitting pull request to predict the pass or
fail of continuous integration.
* Format the code with `gofmt`
* Run the test with data race enabled `go test -race ./…`
## Code Review
To make it easier for your PR to receive reviews, consider the reviewers will need you to:
* follow [good coding guidelines](https://github.com/golang/go/wiki/CodeReviewComments).
* write [good commit messages](https://chris.beams.io/posts/git-commit/).
* break large changes into a logical series of smaller patches which individually make easily understandable changes, and in aggregate solve a broader issue.

21
ROADMAP.md Normal file
View File

@@ -0,0 +1,21 @@
# go-zero Roadmap
This document defines a high level roadmap for go-zero development and upcoming releases.
Community and contributor involvement is vital for successfully implementing all desired items for each release.
We hope that the items listed below will inspire further engagement from the community to keep go-zero progressing and shipping exciting and valuable features.
## 2021 Q2
- Support TLS in redis connections
- Support service discovery through K8S watch api
- Log full sql statements for easier sql problem solving
## 2021 Q3
- Support `goctl mock` command to start a mocking server with given `.api` file
- Adapt builtin tracing mechanism to opentracing solutions
- Support `goctl model pg` to support PostgreSQL code generation
## 2021 Q4
- Support `goctl doctor` command to report potential issues for given service
- Support `context` in redis related methods for timeout and tracing
- Support `context` in sql related methods for timeout and tracing
- Support `context` in mongodb related methods for timeout and tracing

76
code-of-conduct.md Normal file
View File

@@ -0,0 +1,76 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to make participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies within all project spaces, and it also applies when
an individual is representing the project or its community in public spaces.
Examples of representing a project or community include using an official
project e-mail address, posting via an official social media account, or acting
as an appointed representative at an online or offline event. Representation of
a project may be further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at [INSERT EMAIL ADDRESS]. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

View File

@@ -27,43 +27,43 @@ return true
`
)
// ErrTooLargeOffset indicates the offset is too large in bitset.
var ErrTooLargeOffset = errors.New("too large offset")
type (
BitSetProvider interface {
// A Filter is a bloom filter.
Filter struct {
bits uint
bitSet bitSetProvider
}
bitSetProvider interface {
check([]uint) (bool, error)
set([]uint) error
}
BloomFilter struct {
bits uint
bitSet BitSetProvider
}
)
// New create a BloomFilter, store is the backed redis, key is the key for the bloom filter,
// New create a Filter, store is the backed redis, key is the key for the bloom filter,
// bits is how many bits will be used, maps is how many hashes for each addition.
// best practices:
// elements - means how many actual elements
// when maps = 14, formula: 0.7*(bits/maps), bits = 20*elements, the error rate is 0.000067 < 1e-4
// for detailed error rate table, see http://pages.cs.wisc.edu/~cao/papers/summary-cache/node8.html
func New(store *redis.Redis, key string, bits uint) *BloomFilter {
return &BloomFilter{
func New(store *redis.Redis, key string, bits uint) *Filter {
return &Filter{
bits: bits,
bitSet: newRedisBitSet(store, key, bits),
}
}
func (f *BloomFilter) Add(data []byte) error {
// Add adds data into f.
func (f *Filter) Add(data []byte) error {
locations := f.getLocations(data)
err := f.bitSet.set(locations)
if err != nil {
return err
}
return nil
return f.bitSet.set(locations)
}
func (f *BloomFilter) Exists(data []byte) (bool, error) {
// Exists checks if data is in f.
func (f *Filter) Exists(data []byte) (bool, error) {
locations := f.getLocations(data)
isSet, err := f.bitSet.check(locations)
if err != nil {
@@ -76,7 +76,7 @@ func (f *BloomFilter) Exists(data []byte) (bool, error) {
return true, nil
}
func (f *BloomFilter) getLocations(data []byte) []uint {
func (f *Filter) getLocations(data []byte) []uint {
locations := make([]uint, maps)
for i := uint(0); i < maps; i++ {
hashValue := hash.Hash(append(data, byte(i)))
@@ -127,11 +127,12 @@ func (r *redisBitSet) check(offsets []uint) (bool, error) {
return false, err
}
if exists, ok := resp.(int64); !ok {
exists, ok := resp.(int64)
if !ok {
return false, nil
} else {
return exists == 1, nil
}
return exists == 1, nil
}
func (r *redisBitSet) del() error {
@@ -152,7 +153,7 @@ func (r *redisBitSet) set(offsets []uint) error {
_, err = r.store.Eval(setScript, []string{r.key}, args)
if err == redis.Nil {
return nil
} else {
return err
}
return err
}

View File

@@ -18,12 +18,14 @@ const (
timeFormat = "15:04:05"
)
// ErrServiceUnavailable is returned when the CB state is open
// ErrServiceUnavailable is returned when the Breaker state is open.
var ErrServiceUnavailable = errors.New("circuit breaker is open")
type (
// Acceptable is the func to check if the error can be accepted.
Acceptable func(err error) bool
// A Breaker represents a circuit breaker.
Breaker interface {
// Name returns the name of the Breaker.
Name() string
@@ -61,10 +63,14 @@ type (
DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
// Option defines the method to customize a Breaker.
Option func(breaker *circuitBreaker)
// Promise interface defines the callbacks that returned by Breaker.Allow.
Promise interface {
// Accept tells the Breaker that the call is successful.
Accept()
// Reject tells the Breaker that the call is failed.
Reject(reason string)
}
@@ -89,6 +95,8 @@ type (
}
)
// NewBreaker returns a Breaker object.
// opts can be used to customize the Breaker.
func NewBreaker(opts ...Option) Breaker {
var b circuitBreaker
for _, opt := range opts {
@@ -127,6 +135,7 @@ func (cb *circuitBreaker) Name() string {
return cb.name
}
// WithName returns a function to set the name of a Breaker.
func WithName(name string) Option {
return func(b *circuitBreaker) {
b.name = name

View File

@@ -122,8 +122,7 @@ func BenchmarkGoogleBreaker(b *testing.B) {
}
}
type mockedPromise struct {
}
type mockedPromise struct{}
func (m *mockedPromise) Accept() {
}

View File

@@ -7,24 +7,28 @@ var (
breakers = make(map[string]Breaker)
)
// Do calls Breaker.Do on the Breaker with given name.
func Do(name string, req func() error) error {
return do(name, func(b Breaker) error {
return b.Do(req)
})
}
// DoWithAcceptable calls Breaker.DoWithAcceptable on the Breaker with given name.
func DoWithAcceptable(name string, req func() error, acceptable Acceptable) error {
return do(name, func(b Breaker) error {
return b.DoWithAcceptable(req, acceptable)
})
}
// DoWithFallback calls Breaker.DoWithFallback on the Breaker with given name.
func DoWithFallback(name string, req func() error, fallback func(err error) error) error {
return do(name, func(b Breaker) error {
return b.DoWithFallback(req, fallback)
})
}
// DoWithFallbackAcceptable calls Breaker.DoWithFallbackAcceptable on the Breaker with given name.
func DoWithFallbackAcceptable(name string, req func() error, fallback func(err error) error,
acceptable Acceptable) error {
return do(name, func(b Breaker) error {
@@ -32,6 +36,7 @@ func DoWithFallbackAcceptable(name string, req func() error, fallback func(err e
})
}
// GetBreaker returns the Breaker with the given name.
func GetBreaker(name string) Breaker {
lock.RLock()
b, ok := breakers[name]
@@ -51,7 +56,8 @@ func GetBreaker(name string) Breaker {
return b
}
func NoBreakFor(name string) {
// NoBreakerFor disables the circuit breaker for the given name.
func NoBreakerFor(name string) {
lock.Lock()
breakers[name] = newNoOpBreaker()
lock.Unlock()

View File

@@ -55,7 +55,7 @@ func TestBreakersDoWithAcceptable(t *testing.T) {
}
func TestBreakersNoBreakerFor(t *testing.T) {
NoBreakFor("any")
NoBreakerFor("any")
errDummy := errors.New("any")
for i := 0; i < 10000; i++ {
assert.Equal(t, errDummy, GetBreaker("any").Do(func() error {

View File

@@ -64,9 +64,9 @@ func (b *googleBreaker) doReq(req func() error, fallback func(err error) error,
if err := b.accept(); err != nil {
if fallback != nil {
return fallback(err)
} else {
return err
}
return err
}
defer func() {
@@ -94,7 +94,7 @@ func (b *googleBreaker) markFailure() {
b.stat.Add(0)
}
func (b *googleBreaker) history() (accepts int64, total int64) {
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count

View File

@@ -7,11 +7,13 @@ import (
"strings"
)
// EnterToContinue let stdin waiting for an enter key to continue.
func EnterToContinue() {
fmt.Print("Press 'Enter' to continue...")
bufio.NewReader(os.Stdin).ReadBytes('\n')
}
// ReadLine shows prompt to stdout and read a line from stdin.
func ReadLine(prompt string) string {
fmt.Print(prompt)
input, _ := bufio.NewReader(os.Stdin).ReadString('\n')

View File

@@ -10,6 +10,7 @@ import (
"github.com/tal-tech/go-zero/core/logx"
)
// ErrPaddingSize indicates bad padding size.
var ErrPaddingSize = errors.New("padding size error")
type ecb struct {
@@ -26,6 +27,7 @@ func newECB(b cipher.Block) *ecb {
type ecbEncrypter ecb
// NewECBEncrypter returns an ECB encrypter.
func NewECBEncrypter(b cipher.Block) cipher.BlockMode {
return (*ecbEncrypter)(newECB(b))
}
@@ -52,6 +54,7 @@ func (x *ecbEncrypter) CryptBlocks(dst, src []byte) {
type ecbDecrypter ecb
// NewECBDecrypter returns an ECB decrypter.
func NewECBDecrypter(b cipher.Block) cipher.BlockMode {
return (*ecbDecrypter)(newECB(b))
}
@@ -78,6 +81,7 @@ func (x *ecbDecrypter) CryptBlocks(dst, src []byte) {
}
}
// EcbDecrypt decrypts src with the given key.
func EcbDecrypt(key, src []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
@@ -92,6 +96,8 @@ func EcbDecrypt(key, src []byte) ([]byte, error) {
return pkcs5Unpadding(decrypted, decrypter.BlockSize())
}
// EcbDecryptBase64 decrypts base64 encoded src with the given base64 encoded key.
// The returned string is also base64 encoded.
func EcbDecryptBase64(key, src string) (string, error) {
keyBytes, err := getKeyBytes(key)
if err != nil {
@@ -111,6 +117,7 @@ func EcbDecryptBase64(key, src string) (string, error) {
return base64.StdEncoding.EncodeToString(decryptedBytes), nil
}
// EcbEncrypt encrypts src with the given key.
func EcbEncrypt(key, src []byte) ([]byte, error) {
block, err := aes.NewCipher(key)
if err != nil {
@@ -126,6 +133,8 @@ func EcbEncrypt(key, src []byte) ([]byte, error) {
return crypted, nil
}
// EcbEncryptBase64 encrypts base64 encoded src with the given base64 encoded key.
// The returned string is also base64 encoded.
func EcbEncryptBase64(key, src string) (string, error) {
keyBytes, err := getKeyBytes(key)
if err != nil {
@@ -146,15 +155,16 @@ func EcbEncryptBase64(key, src string) (string, error) {
}
func getKeyBytes(key string) ([]byte, error) {
if len(key) > 32 {
if keyBytes, err := base64.StdEncoding.DecodeString(key); err != nil {
return nil, err
} else {
return keyBytes, nil
}
if len(key) <= 32 {
return []byte(key), nil
}
return []byte(key), nil
keyBytes, err := base64.StdEncoding.DecodeString(key)
if err != nil {
return nil, err
}
return keyBytes, nil
}
func pkcs5Padding(ciphertext []byte, blockSize int) []byte {

65
core/codec/aesecb_test.go Normal file
View File

@@ -0,0 +1,65 @@
package codec
import (
"encoding/base64"
"testing"
"github.com/stretchr/testify/assert"
)
func TestAesEcb(t *testing.T) {
var (
key = []byte("q4t7w!z%C*F-JaNdRgUjXn2r5u8x/A?D")
val = []byte("hello")
badKey1 = []byte("aaaaaaaaa")
// more than 32 chars
badKey2 = []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
)
_, err := EcbEncrypt(badKey1, val)
assert.NotNil(t, err)
_, err = EcbEncrypt(badKey2, val)
assert.NotNil(t, err)
dst, err := EcbEncrypt(key, val)
assert.Nil(t, err)
_, err = EcbDecrypt(badKey1, dst)
assert.NotNil(t, err)
_, err = EcbDecrypt(badKey2, dst)
assert.NotNil(t, err)
_, err = EcbDecrypt(key, val)
// not enough block, just nil
assert.Nil(t, err)
src, err := EcbDecrypt(key, dst)
assert.Nil(t, err)
assert.Equal(t, val, src)
}
func TestAesEcbBase64(t *testing.T) {
const (
val = "hello"
badKey1 = "aaaaaaaaa"
// more than 32 chars
badKey2 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
)
key := []byte("q4t7w!z%C*F-JaNdRgUjXn2r5u8x/A?D")
b64Key := base64.StdEncoding.EncodeToString(key)
b64Val := base64.StdEncoding.EncodeToString([]byte(val))
_, err := EcbEncryptBase64(badKey1, val)
assert.NotNil(t, err)
_, err = EcbEncryptBase64(badKey2, val)
assert.NotNil(t, err)
_, err = EcbEncryptBase64(b64Key, val)
assert.NotNil(t, err)
dst, err := EcbEncryptBase64(b64Key, b64Val)
assert.Nil(t, err)
_, err = EcbDecryptBase64(badKey1, dst)
assert.NotNil(t, err)
_, err = EcbDecryptBase64(badKey2, dst)
assert.NotNil(t, err)
_, err = EcbDecryptBase64(b64Key, val)
assert.NotNil(t, err)
src, err := EcbDecryptBase64(b64Key, dst)
assert.Nil(t, err)
b, err := base64.StdEncoding.DecodeString(src)
assert.Nil(t, err)
assert.Equal(t, val, string(b))
}

View File

@@ -11,8 +11,11 @@ import (
// 2048-bit MODP Group
var (
ErrInvalidPriKey = errors.New("invalid private key")
ErrInvalidPubKey = errors.New("invalid public key")
// ErrInvalidPriKey indicates the invalid private key.
ErrInvalidPriKey = errors.New("invalid private key")
// ErrInvalidPubKey indicates the invalid public key.
ErrInvalidPubKey = errors.New("invalid public key")
// ErrPubKeyOutOfBound indicates the public key is out of bound.
ErrPubKeyOutOfBound = errors.New("public key out of bound")
p, _ = new(big.Int).SetString("FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF", 16)
@@ -20,11 +23,13 @@ var (
zero = big.NewInt(0)
)
// DhKey defines the Diffie Hellman key.
type DhKey struct {
PriKey *big.Int
PubKey *big.Int
}
// ComputeKey returns a key from public key and private key.
func ComputeKey(pubKey, priKey *big.Int) (*big.Int, error) {
if pubKey == nil {
return nil, ErrInvalidPubKey
@@ -41,6 +46,7 @@ func ComputeKey(pubKey, priKey *big.Int) (*big.Int, error) {
return new(big.Int).Exp(pubKey, priKey, p), nil
}
// GenerateKey returns a Diffie Hellman key.
func GenerateKey() (*DhKey, error) {
var err error
var x *big.Int
@@ -63,10 +69,12 @@ func GenerateKey() (*DhKey, error) {
return key, nil
}
// NewPublicKey returns a public key from the given bytes.
func NewPublicKey(bs []byte) *big.Int {
return new(big.Int).SetBytes(bs)
}
// Bytes returns public key bytes.
func (k *DhKey) Bytes() []byte {
if k.PubKey == nil {
return nil

View File

@@ -8,6 +8,7 @@ import (
const unzipLimit = 100 * 1024 * 1024 // 100MB
// Gzip compresses bs.
func Gzip(bs []byte) []byte {
var b bytes.Buffer
@@ -18,6 +19,7 @@ func Gzip(bs []byte) []byte {
return b.Bytes()
}
// Gunzip uncompresses bs.
func Gunzip(bs []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewBuffer(bs))
if err != nil {

View File

@@ -7,12 +7,14 @@ import (
"io"
)
// Hmac returns HMAC bytes for body with the given key.
func Hmac(key []byte, body string) []byte {
h := hmac.New(sha256.New, key)
io.WriteString(h, body)
return h.Sum(nil)
}
// HmacBase64 returns the base64 encoded string of HMAC for body with the given key.
func HmacBase64(key []byte, body string) string {
return base64.StdEncoding.EncodeToString(Hmac(key, body))
}

View File

@@ -11,17 +11,22 @@ import (
)
var (
// ErrPrivateKey indicates the invalid private key.
ErrPrivateKey = errors.New("private key error")
ErrPublicKey = errors.New("failed to parse PEM block containing the public key")
ErrNotRsaKey = errors.New("key type is not RSA")
// ErrPublicKey indicates the invalid public key.
ErrPublicKey = errors.New("failed to parse PEM block containing the public key")
// ErrNotRsaKey indicates the invalid RSA key.
ErrNotRsaKey = errors.New("key type is not RSA")
)
type (
// RsaDecrypter represents a RSA decrypter.
RsaDecrypter interface {
Decrypt(input []byte) ([]byte, error)
DecryptBase64(input string) ([]byte, error)
}
// RsaEncrypter represents a RSA encrypter.
RsaEncrypter interface {
Encrypt(input []byte) ([]byte, error)
}
@@ -41,6 +46,7 @@ type (
}
)
// NewRsaDecrypter returns a RsaDecrypter with the given file.
func NewRsaDecrypter(file string) (RsaDecrypter, error) {
content, err := ioutil.ReadFile(file)
if err != nil {
@@ -84,6 +90,7 @@ func (r *rsaDecrypter) DecryptBase64(input string) ([]byte, error) {
return r.Decrypt(base64Decoded)
}
// NewRsaEncrypter returns a RsaEncrypter with the given key.
func NewRsaEncrypter(key []byte) (RsaEncrypter, error) {
block, _ := pem.Decode(key)
if block == nil {

View File

@@ -23,8 +23,10 @@ const (
var emptyLruCache = emptyLru{}
type (
// CacheOption defines the method to customize a Cache.
CacheOption func(cache *Cache)
// A Cache object is a in-memory cache.
Cache struct {
name string
lock sync.Mutex
@@ -38,6 +40,7 @@ type (
}
)
// NewCache returns a Cache with given expire.
func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
cache := &Cache{
data: make(map[string]interface{}),
@@ -72,6 +75,7 @@ func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
return cache, nil
}
// Del deletes the item with the given key from c.
func (c *Cache) Del(key string) {
c.lock.Lock()
delete(c.data, key)
@@ -80,6 +84,7 @@ func (c *Cache) Del(key string) {
c.timingWheel.RemoveTimer(key)
}
// Get returns the item with the given key from c.
func (c *Cache) Get(key string) (interface{}, bool) {
value, ok := c.doGet(key)
if ok {
@@ -91,6 +96,7 @@ func (c *Cache) Get(key string) (interface{}, bool) {
return value, ok
}
// Set sets value into c with key.
func (c *Cache) Set(key string, value interface{}) {
c.lock.Lock()
_, ok := c.data[key]
@@ -106,6 +112,9 @@ func (c *Cache) Set(key string, value interface{}) {
}
}
// Take returns the item with the given key.
// If the item is in c, return it directly.
// If not, use fetch method to get the item, set into c and return it.
func (c *Cache) Take(key string, fetch func() (interface{}, error)) (interface{}, error) {
if val, ok := c.doGet(key); ok {
c.stats.IncrementHit()
@@ -136,11 +145,10 @@ func (c *Cache) Take(key string, fetch func() (interface{}, error)) (interface{}
if fresh {
c.stats.IncrementMiss()
return val, nil
} else {
// got the result from previous ongoing query
c.stats.IncrementHit()
}
// got the result from previous ongoing query
c.stats.IncrementHit()
return val, nil
}
@@ -168,6 +176,7 @@ func (c *Cache) size() int {
return len(c.data)
}
// WithLimit customizes a Cache with items up to limit.
func WithLimit(limit int) CacheOption {
return func(cache *Cache) {
if limit > 0 {
@@ -176,6 +185,7 @@ func WithLimit(limit int) CacheOption {
}
}
// WithName customizes a Cache with the given name.
func WithName(name string) CacheOption {
return func(cache *Cache) {
cache.name = name

View File

@@ -2,6 +2,7 @@ package collection
import "sync"
// A Queue is a FIFO queue.
type Queue struct {
lock sync.Mutex
elements []interface{}
@@ -11,6 +12,7 @@ type Queue struct {
count int
}
// NewQueue returns a Queue object.
func NewQueue(size int) *Queue {
return &Queue{
elements: make([]interface{}, size),
@@ -18,6 +20,7 @@ func NewQueue(size int) *Queue {
}
}
// Empty checks if q is empty.
func (q *Queue) Empty() bool {
q.lock.Lock()
empty := q.count == 0
@@ -26,6 +29,7 @@ func (q *Queue) Empty() bool {
return empty
}
// Put puts element into q at the last position.
func (q *Queue) Put(element interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -44,6 +48,7 @@ func (q *Queue) Put(element interface{}) {
q.count++
}
// Take takes the first element out of q if not empty.
func (q *Queue) Take() (interface{}, bool) {
q.lock.Lock()
defer q.lock.Unlock()

View File

@@ -1,10 +1,15 @@
package collection
import "sync"
// A Ring can be used as fixed size ring.
type Ring struct {
elements []interface{}
index int
lock sync.Mutex
}
// NewRing returns a Ring object with the given size n.
func NewRing(n int) *Ring {
if n < 1 {
panic("n should be greater than 0")
@@ -15,12 +20,20 @@ func NewRing(n int) *Ring {
}
}
// Add adds v into r.
func (r *Ring) Add(v interface{}) {
r.lock.Lock()
defer r.lock.Unlock()
r.elements[r.index%len(r.elements)] = v
r.index++
}
// Take takes all items from r.
func (r *Ring) Take() []interface{} {
r.lock.Lock()
defer r.lock.Unlock()
var size int
var start int
if r.index > len(r.elements) {

View File

@@ -1,6 +1,7 @@
package collection
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
@@ -29,3 +30,30 @@ func TestRingMore(t *testing.T) {
elements := ring.Take()
assert.ElementsMatch(t, []interface{}{6, 7, 8, 9, 10}, elements)
}
func TestRingAdd(t *testing.T) {
ring := NewRing(5051)
wg := sync.WaitGroup{}
for i := 1; i <= 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 1; j <= i; j++ {
ring.Add(i)
}
}(i)
}
wg.Wait()
assert.Equal(t, 5050, len(ring.Take()))
}
func BenchmarkRingAdd(b *testing.B) {
ring := NewRing(500)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for i := 0; i < b.N; i++ {
ring.Add(i)
}
}
})
}

View File

@@ -73,9 +73,9 @@ func (rw *RollingWindow) span() int {
offset := int(timex.Since(rw.lastTime) / rw.interval)
if 0 <= offset && offset < rw.size {
return offset
} else {
return rw.size
}
return rw.size
}
func (rw *RollingWindow) updateOffset() {

View File

@@ -139,7 +139,7 @@ func TestRollingWindowBucketTimeBoundary(t *testing.T) {
func TestRollingWindowDataRace(t *testing.T) {
const size = 3
r := NewRollingWindow(size, duration)
var stop = make(chan bool)
stop := make(chan bool)
go func() {
for {
select {

View File

@@ -18,6 +18,7 @@ type SafeMap struct {
dirtyNew map[interface{}]interface{}
}
// NewSafeMap returns a SafeMap.
func NewSafeMap() *SafeMap {
return &SafeMap{
dirtyOld: make(map[interface{}]interface{}),
@@ -25,6 +26,7 @@ func NewSafeMap() *SafeMap {
}
}
// Del deletes the value with the given key from m.
func (m *SafeMap) Del(key interface{}) {
m.lock.Lock()
if _, ok := m.dirtyOld[key]; ok {
@@ -53,18 +55,20 @@ func (m *SafeMap) Del(key interface{}) {
m.lock.Unlock()
}
// Get gets the value with the given key from m.
func (m *SafeMap) Get(key interface{}) (interface{}, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
if val, ok := m.dirtyOld[key]; ok {
return val, true
} else {
val, ok := m.dirtyNew[key]
return val, ok
}
val, ok := m.dirtyNew[key]
return val, ok
}
// Set sets the value into m with the given key.
func (m *SafeMap) Set(key, value interface{}) {
m.lock.Lock()
if m.deletionOld <= maxDeletion {
@@ -83,6 +87,7 @@ func (m *SafeMap) Set(key, value interface{}) {
m.lock.Unlock()
}
// Size returns the size of m.
func (m *SafeMap) Size() int {
m.lock.RLock()
size := len(m.dirtyOld) + len(m.dirtyNew)

View File

@@ -21,6 +21,7 @@ type Set struct {
tp int
}
// NewSet returns a managed Set, can only put the values with the same type.
func NewSet() *Set {
return &Set{
data: make(map[interface{}]lang.PlaceholderType),
@@ -28,6 +29,7 @@ func NewSet() *Set {
}
}
// NewUnmanagedSet returns a unmanaged Set, which can put values with different types.
func NewUnmanagedSet() *Set {
return &Set{
data: make(map[interface{}]lang.PlaceholderType),
@@ -35,42 +37,49 @@ func NewUnmanagedSet() *Set {
}
}
// Add adds i into s.
func (s *Set) Add(i ...interface{}) {
for _, each := range i {
s.add(each)
}
}
// AddInt adds int values ii into s.
func (s *Set) AddInt(ii ...int) {
for _, each := range ii {
s.add(each)
}
}
// AddInt64 adds int64 values ii into s.
func (s *Set) AddInt64(ii ...int64) {
for _, each := range ii {
s.add(each)
}
}
// AddUint adds uint values ii into s.
func (s *Set) AddUint(ii ...uint) {
for _, each := range ii {
s.add(each)
}
}
// AddUint64 adds uint64 values ii into s.
func (s *Set) AddUint64(ii ...uint64) {
for _, each := range ii {
s.add(each)
}
}
// AddStr adds string values ss into s.
func (s *Set) AddStr(ss ...string) {
for _, each := range ss {
s.add(each)
}
}
// Contains checks if i is in s.
func (s *Set) Contains(i interface{}) bool {
if len(s.data) == 0 {
return false
@@ -81,6 +90,7 @@ func (s *Set) Contains(i interface{}) bool {
return ok
}
// Keys returns the keys in s.
func (s *Set) Keys() []interface{} {
var keys []interface{}
@@ -91,6 +101,7 @@ func (s *Set) Keys() []interface{} {
return keys
}
// KeysInt returns the int keys in s.
func (s *Set) KeysInt() []int {
var keys []int
@@ -105,6 +116,7 @@ func (s *Set) KeysInt() []int {
return keys
}
// KeysInt64 returns int64 keys in s.
func (s *Set) KeysInt64() []int64 {
var keys []int64
@@ -119,6 +131,7 @@ func (s *Set) KeysInt64() []int64 {
return keys
}
// KeysUint returns uint keys in s.
func (s *Set) KeysUint() []uint {
var keys []uint
@@ -133,6 +146,7 @@ func (s *Set) KeysUint() []uint {
return keys
}
// KeysUint64 returns uint64 keys in s.
func (s *Set) KeysUint64() []uint64 {
var keys []uint64
@@ -147,6 +161,7 @@ func (s *Set) KeysUint64() []uint64 {
return keys
}
// KeysStr returns string keys in s.
func (s *Set) KeysStr() []string {
var keys []string
@@ -161,11 +176,13 @@ func (s *Set) KeysStr() []string {
return keys
}
// Remove removes i from s.
func (s *Set) Remove(i interface{}) {
s.validate(i)
delete(s.data, i)
}
// Count returns the number of items in s.
func (s *Set) Count() int {
return len(s.data)
}

View File

@@ -13,8 +13,10 @@ import (
const drainWorkers = 8
type (
// Execute defines the method to execute the task.
Execute func(key, value interface{})
// A TimingWheel is a timing wheel object to schedule tasks.
TimingWheel struct {
interval time.Duration
ticker timex.Ticker
@@ -54,6 +56,7 @@ type (
}
)
// NewTimingWheel returns a TimingWheel.
func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {
if interval <= 0 || numSlots <= 0 || execute == nil {
return nil, fmt.Errorf("interval: %v, slots: %d, execute: %p", interval, numSlots, execute)
@@ -85,10 +88,12 @@ func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execu
return tw, nil
}
// Drain drains all items and executes them.
func (tw *TimingWheel) Drain(fn func(key, value interface{})) {
tw.drainChannel <- fn
}
// MoveTimer moves the task with the given key to the given delay.
func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) {
if delay <= 0 || key == nil {
return
@@ -100,6 +105,7 @@ func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration) {
}
}
// RemoveTimer removes the task with the given key.
func (tw *TimingWheel) RemoveTimer(key interface{}) {
if key == nil {
return
@@ -108,6 +114,7 @@ func (tw *TimingWheel) RemoveTimer(key interface{}) {
tw.removeChannel <- key
}
// SetTimer sets the task value with the given key to the delay.
func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) {
if delay <= 0 || key == nil {
return
@@ -122,6 +129,7 @@ func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration) {
}
}
// Stop stops tw.
func (tw *TimingWheel) Stop() {
close(tw.stopChannel)
}
@@ -143,7 +151,7 @@ func (tw *TimingWheel) drainAll(fn func(key, value interface{})) {
}
}
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int) {
steps := int(d / tw.interval)
pos = (tw.tickedPos + steps) % tw.numSlots
circle = (steps - 1) / tw.numSlots

View File

@@ -16,26 +16,43 @@ var loaders = map[string]func([]byte, interface{}) error{
".yml": LoadConfigFromYamlBytes,
}
func LoadConfig(file string, v interface{}) error {
if content, err := ioutil.ReadFile(file); err != nil {
// LoadConfig loads config into v from file, .json, .yaml and .yml are acceptable.
func LoadConfig(file string, v interface{}, opts ...Option) error {
content, err := ioutil.ReadFile(file)
if err != nil {
return err
} else if loader, ok := loaders[path.Ext(file)]; ok {
return loader([]byte(os.ExpandEnv(string(content))), v)
} else {
return fmt.Errorf("unrecoginized file type: %s", file)
}
loader, ok := loaders[path.Ext(file)]
if !ok {
return fmt.Errorf("unrecognized file type: %s", file)
}
var opt options
for _, o := range opts {
o(&opt)
}
if opt.env {
return loader([]byte(os.ExpandEnv(string(content))), v)
}
return loader(content, v)
}
// LoadConfigFromJsonBytes loads config into v from content json bytes.
func LoadConfigFromJsonBytes(content []byte, v interface{}) error {
return mapping.UnmarshalJsonBytes(content, v)
}
// LoadConfigFromYamlBytes loads config into v from content yaml bytes.
func LoadConfigFromYamlBytes(content []byte, v interface{}) error {
return mapping.UnmarshalYamlBytes(content, v)
}
func MustLoad(path string, v interface{}) {
if err := LoadConfig(path, v); err != nil {
// MustLoad loads config into v from path, exits on error.
func MustLoad(path string, v interface{}, opts ...Option) {
if err := LoadConfig(path, v, opts...); err != nil {
log.Fatalf("error: config file %s, %s", path, err.Error())
}
}

View File

@@ -30,7 +30,8 @@ func TestConfigJson(t *testing.T) {
text := `{
"a": "foo",
"b": 1,
"c": "${FOO}"
"c": "${FOO}",
"d": "abcd!@#$112"
}`
for _, test := range tests {
test := test
@@ -45,11 +46,49 @@ func TestConfigJson(t *testing.T) {
A string `json:"a"`
B int `json:"b"`
C string `json:"c"`
D string `json:"d"`
}
MustLoad(tmpfile, &val)
assert.Equal(t, "foo", val.A)
assert.Equal(t, 1, val.B)
assert.Equal(t, "${FOO}", val.C)
assert.Equal(t, "abcd!@#$112", val.D)
})
}
}
func TestConfigJsonEnv(t *testing.T) {
tests := []string{
".json",
".yaml",
".yml",
}
text := `{
"a": "foo",
"b": 1,
"c": "${FOO}",
"d": "abcd!@#$a12 3"
}`
for _, test := range tests {
test := test
t.Run(test, func(t *testing.T) {
os.Setenv("FOO", "2")
defer os.Unsetenv("FOO")
tmpfile, err := createTempFile(test, text)
assert.Nil(t, err)
defer os.Remove(tmpfile)
var val struct {
A string `json:"a"`
B int `json:"b"`
C string `json:"c"`
D string `json:"d"`
}
MustLoad(tmpfile, &val, UseEnv())
assert.Equal(t, "foo", val.A)
assert.Equal(t, 1, val.B)
assert.Equal(t, "2", val.C)
assert.Equal(t, "abcd!@# 3", val.D)
})
}
}

17
core/conf/options.go Normal file
View File

@@ -0,0 +1,17 @@
package conf
type (
// Option defines the method to customize the config options.
Option func(opt *options)
options struct {
env bool
}
)
// UseEnv customizes the config to use environment variables.
func UseEnv() Option {
return func(opt *options) {
opt.env = true
}
}

View File

@@ -2,6 +2,7 @@ package conf
import (
"fmt"
"os"
"strconv"
"strings"
"sync"
@@ -30,14 +31,19 @@ type mapBasedProperties struct {
lock sync.RWMutex
}
// Loads the properties into a properties configuration instance.
// LoadProperties loads the properties into a properties configuration instance.
// Returns an error that indicates if there was a problem loading the configuration.
func LoadProperties(filename string) (Properties, error) {
func LoadProperties(filename string, opts ...Option) (Properties, error) {
lines, err := iox.ReadTextLines(filename, iox.WithoutBlank(), iox.OmitWithPrefix("#"))
if err != nil {
return nil, err
}
var opt options
for _, o := range opts {
o(&opt)
}
raw := make(map[string]string)
for i := range lines {
pair := strings.Split(lines[i], "=")
@@ -50,7 +56,11 @@ func LoadProperties(filename string) (Properties, error) {
key := strings.TrimSpace(pair[0])
value := strings.TrimSpace(pair[1])
raw[key] = value
if opt.env {
raw[key] = os.ExpandEnv(value)
} else {
raw[key] = value
}
}
return &mapBasedProperties{
@@ -87,7 +97,7 @@ func (config *mapBasedProperties) SetInt(key string, value int) {
config.lock.Unlock()
}
// Dumps the configuration internal map into a string.
// ToString dumps the configuration internal map into a string.
func (config *mapBasedProperties) ToString() string {
config.lock.RLock()
ret := fmt.Sprintf("%s", config.properties)
@@ -96,12 +106,12 @@ func (config *mapBasedProperties) ToString() string {
return ret
}
// Returns the error message.
// Error returns the error message.
func (configError *PropertyError) Error() string {
return configError.message
}
// Builds a new properties configuration structure
// NewProperties builds a new properties configuration structure.
func NewProperties() Properties {
return &mapBasedProperties{
properties: make(map[string]string),

View File

@@ -31,6 +31,39 @@ func TestProperties(t *testing.T) {
assert.Contains(t, val, "app.threads")
}
func TestPropertiesEnv(t *testing.T) {
text := `app.name = test
app.program=app
app.env1 = ${FOO}
app.env2 = $none
# this is comment
app.threads = 5`
tmpfile, err := fs.TempFilenameWithText(text)
assert.Nil(t, err)
defer os.Remove(tmpfile)
os.Setenv("FOO", "2")
defer os.Unsetenv("FOO")
props, err := LoadProperties(tmpfile, UseEnv())
assert.Nil(t, err)
assert.Equal(t, "test", props.GetString("app.name"))
assert.Equal(t, "app", props.GetString("app.program"))
assert.Equal(t, 5, props.GetInt("app.threads"))
assert.Equal(t, "2", props.GetString("app.env1"))
assert.Equal(t, "", props.GetString("app.env2"))
val := props.ToString()
assert.Contains(t, val, "app.name")
assert.Contains(t, val, "app.program")
assert.Contains(t, val, "app.threads")
assert.Contains(t, val, "app.env1")
assert.Contains(t, val, "app.env2")
}
func TestLoadProperties_badContent(t *testing.T) {
filename, err := fs.TempFilenameWithText("hello")
assert.Nil(t, err)

View File

@@ -1,17 +0,0 @@
package contextx
import (
"context"
"time"
)
func ShrinkDeadline(ctx context.Context, timeout time.Duration) (context.Context, func()) {
if deadline, ok := ctx.Deadline(); ok {
leftTime := time.Until(deadline)
if leftTime < timeout {
timeout = leftTime
}
}
return context.WithDeadline(ctx, time.Now().Add(timeout))
}

View File

@@ -1,31 +0,0 @@
package contextx
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestShrinkDeadlineLess(t *testing.T) {
deadline := time.Now().Add(time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
ctx, cancel = ShrinkDeadline(ctx, time.Minute)
defer cancel()
dl, ok := ctx.Deadline()
assert.True(t, ok)
assert.Equal(t, deadline, dl)
}
func TestShrinkDeadlineMore(t *testing.T) {
deadline := time.Now().Add(time.Minute)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
ctx, cancel = ShrinkDeadline(ctx, time.Second)
defer cancel()
dl, ok := ctx.Deadline()
assert.True(t, ok)
assert.True(t, dl.Before(deadline))
}

View File

@@ -19,6 +19,7 @@ func (cv contextValuer) Value(key string) (interface{}, bool) {
return v, v != nil
}
// For unmarshals ctx into v.
func For(ctx context.Context, v interface{}) error {
return unmarshaler.UnmarshalValuer(contextValuer{
Context: ctx,

View File

@@ -21,6 +21,7 @@ func (valueOnlyContext) Err() error {
return nil
}
// ValueOnlyFrom takes all values from the given ctx, without deadline and error control.
func ValueOnlyFrom(ctx context.Context) context.Context {
return valueOnlyContext{
Context: ctx,

View File

@@ -14,6 +14,7 @@ const (
const timeToLive int64 = 10
// TimeToLive is seconds to live in etcd.
var TimeToLive = timeToLive
func extract(etcdKey string, index int) (string, bool) {

View File

@@ -28,6 +28,9 @@ func TestExtract(t *testing.T) {
_, ok = extract("any", -1)
assert.False(t, ok)
_, ok = extract("any", 10)
assert.False(t, ok)
}
func TestMakeKey(t *testing.T) {

View File

@@ -2,11 +2,13 @@ package discov
import "errors"
// EtcdConf is the config item with the given key on etcd.
type EtcdConf struct {
Hosts []string
Key string
}
// Validate validates c.
func (c EtcdConf) Validate() error {
if len(c.Hosts) == 0 {
return errors.New("empty etcd hosts")

View File

@@ -1,47 +0,0 @@
package discov
import (
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/logx"
)
type (
Facade struct {
endpoints []string
registry *internal.Registry
}
FacadeListener interface {
OnAdd(key, val string)
OnDelete(key string)
}
)
func NewFacade(endpoints []string) Facade {
return Facade{
endpoints: endpoints,
registry: internal.GetRegistry(),
}
}
func (f Facade) Client() internal.EtcdClient {
conn, err := f.registry.GetConn(f.endpoints)
logx.Must(err)
return conn
}
func (f Facade) Monitor(key string, l FacadeListener) {
f.registry.Monitor(f.endpoints, key, listenerAdapter{l})
}
type listenerAdapter struct {
l FacadeListener
}
func (la listenerAdapter) OnAdd(kv internal.KV) {
la.l.OnAdd(kv.Key, kv.Val)
}
func (la listenerAdapter) OnDelete(kv internal.KV) {
la.l.OnDelete(kv.Key)
}

View File

@@ -1,13 +1,15 @@
//go:generate mockgen -package internal -destination etcdclient_mock.go -source etcdclient.go EtcdClient
package internal
import (
"context"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
// EtcdClient interface represents an etcd client.
type EtcdClient interface {
ActiveConnection() *grpc.ClientConn
Close() error

View File

@@ -6,10 +6,11 @@ package internal
import (
context "context"
gomock "github.com/golang/mock/gomock"
clientv3 "go.etcd.io/etcd/clientv3"
grpc "google.golang.org/grpc"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
clientv3 "go.etcd.io/etcd/client/v3"
grpc "google.golang.org/grpc"
)
// MockEtcdClient is a mock of EtcdClient interface

View File

@@ -1,5 +1,6 @@
package internal
// Listener interface wraps the OnUpdate method.
type Listener interface {
OnUpdate(keys []string, values []string, newKey string)
OnUpdate(keys, values []string, newKey string)
}

View File

@@ -14,23 +14,35 @@ import (
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
registryInstance = Registry{
registry = Registry{
clusters: make(map[string]*cluster),
}
connManager = syncx.NewResourceManager()
)
// A Registry is a registry that manages the etcd client connections.
type Registry struct {
clusters map[string]*cluster
lock sync.Mutex
}
// GetRegistry returns a global Registry.
func GetRegistry() *Registry {
return &registryInstance
return &registry
}
// GetConn returns an etcd client connection associated with given endpoints.
func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
return r.getCluster(endpoints).getClient()
}
// Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener.
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
return r.getCluster(endpoints).monitor(key, l)
}
func (r *Registry) getCluster(endpoints []string) *cluster {
@@ -46,14 +58,6 @@ func (r *Registry) getCluster(endpoints []string) *cluster {
return c
}
func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
return r.getCluster(endpoints).getClient()
}
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
return r.getCluster(endpoints).monitor(key, l)
}
type cluster struct {
endpoints []string
key string
@@ -256,26 +260,34 @@ func (c *cluster) reload(cli EtcdClient) {
}
func (c *cluster) watch(cli EtcdClient, key string) {
for {
if c.watchStream(cli, key) {
return
}
}
}
func (c *cluster) watchStream(cli EtcdClient, key string) bool {
rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
for {
select {
case wresp, ok := <-rch:
if !ok {
logx.Error("etcd monitor chan has been closed")
return
return false
}
if wresp.Canceled {
logx.Error("etcd monitor chan has been canceled")
return
logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
return false
}
if wresp.Err() != nil {
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
return
return false
}
c.handleWatchEvents(key, wresp.Events)
case <-c.done:
return
return true
}
}
}
@@ -288,6 +300,7 @@ func (c *cluster) watchConnState(cli EtcdClient) {
watcher.watch(cli.ActiveConnection())
}
// DialClient dials an etcd cluster with given endpoints.
func DialClient(endpoints []string) (EtcdClient, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,

View File

@@ -8,10 +8,11 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/contextx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stringx"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
var mockLock sync.Mutex
@@ -202,11 +203,13 @@ func TestClusterWatch_RespFailures(t *testing.T) {
restore := setMockClient(cli)
defer restore()
ch := make(chan clientv3.WatchResponse)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
c := new(cluster)
c.done = make(chan lang.PlaceholderType)
go func() {
ch <- resp
close(c.done)
}()
c.watch(cli, "any")
})
@@ -220,11 +223,13 @@ func TestClusterWatch_CloseChan(t *testing.T) {
restore := setMockClient(cli)
defer restore()
ch := make(chan clientv3.WatchResponse)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes()
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
c := new(cluster)
c.done = make(chan lang.PlaceholderType)
go func() {
close(ch)
close(c.done)
}()
c.watch(cli, "any")
}

View File

@@ -1,4 +1,5 @@
//go:generate mockgen -package internal -destination statewatcher_mock.go -source statewatcher.go etcdConn
package internal
import (

View File

@@ -1,12 +1,15 @@
//go:generate mockgen -package internal -destination updatelistener_mock.go -source updatelistener.go UpdateListener
package internal
type (
// A KV is used to store an etcd entry with key and value.
KV struct {
Key string
Val string
}
// UpdateListener wraps the OnAdd and OnDelete methods.
UpdateListener interface {
OnAdd(kv KV)
OnDelete(kv KV)

View File

@@ -3,17 +3,22 @@ package internal
import "time"
const (
// Delimiter is a separator that separates the etcd path.
Delimiter = '/'
autoSyncInterval = time.Minute
coolDownInterval = time.Second
dialTimeout = 5 * time.Second
dialKeepAliveTime = 5 * time.Second
requestTimeout = 3 * time.Second
Delimiter = '/'
endpointsSeparator = ","
)
var (
DialTimeout = dialTimeout
// DialTimeout is the dial timeout.
DialTimeout = dialTimeout
// RequestTimeout is the request timeout.
RequestTimeout = requestTimeout
NewClient = DialClient
// NewClient is used to create etcd clients.
NewClient = DialClient
)

View File

@@ -0,0 +1,64 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: "etcd"
namespace: discov
labels:
app: "etcd"
spec:
serviceName: "etcd"
replicas: 5
template:
metadata:
name: "etcd"
labels:
app: "etcd"
spec:
volumes:
- name: etcd-pvc
persistentVolumeClaim:
claimName: etcd-pvc
containers:
- name: "etcd"
image: quay.io/coreos/etcd:latest
ports:
- containerPort: 2379
name: client
- containerPort: 2380
name: peer
env:
- name: CLUSTER_SIZE
value: "5"
- name: SET_NAME
value: "etcd"
- name: VOLNAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
volumeMounts:
- name: etcd-pvc
mountPath: /var/lib/etcd
subPathExpr: $(VOLNAME) # data mounted respectively in each pod
command:
- "/bin/sh"
- "-ecx"
- |
chmod 700 /var/lib/etcd
IP=$(hostname -i)
PEERS=""
for i in $(seq 0 $((${CLUSTER_SIZE} - 1))); do
PEERS="${PEERS}${PEERS:+,}${SET_NAME}-${i}=http://${SET_NAME}-${i}.${SET_NAME}:2380"
done
exec etcd --name ${HOSTNAME} \
--listen-peer-urls http://0.0.0.0:2380 \
--listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://${HOSTNAME}.${SET_NAME}.discov:2379 \
--initial-advertise-peer-urls http://${HOSTNAME}.${SET_NAME}:2380 \
--initial-cluster ${PEERS} \
--initial-cluster-state new \
--logger zap \
--data-dir /var/lib/etcd \
--auto-compaction-retention 1

View File

@@ -7,12 +7,14 @@ import (
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
)
type (
// PublisherOption defines the method to customize a Publisher.
PublisherOption func(client *Publisher)
// A Publisher can be used to publish the value to an etcd cluster on the given key.
Publisher struct {
endpoints []string
key string
@@ -26,6 +28,10 @@ type (
}
)
// NewPublisher returns a Publisher.
// endpoints is the hosts of the etcd cluster.
// key:value are a pair to be published.
// opts are used to customize the Publisher.
func NewPublisher(endpoints []string, key, value string, opts ...PublisherOption) *Publisher {
publisher := &Publisher{
endpoints: endpoints,
@@ -43,6 +49,7 @@ func NewPublisher(endpoints []string, key, value string, opts ...PublisherOption
return publisher
}
// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
cli, err := internal.GetRegistry().GetConn(p.endpoints)
if err != nil {
@@ -61,14 +68,17 @@ func (p *Publisher) KeepAlive() error {
return p.keepAliveAsync(cli)
}
// Pause pauses the renewing of key:value.
func (p *Publisher) Pause() {
p.pauseChan <- lang.Placeholder
}
// Resume resumes the renewing of key:value.
func (p *Publisher) Resume() {
p.resumeChan <- lang.Placeholder
}
// Stop stops the renewing and revokes the registration.
func (p *Publisher) Stop() {
p.quit.Close()
}
@@ -135,6 +145,7 @@ func (p *Publisher) revoke(cli internal.EtcdClient) {
}
}
// WithId customizes a Publisher with the id.
func WithId(id int64) PublisherOption {
return func(publisher *Publisher) {
publisher.id = id

View File

@@ -4,12 +4,14 @@ import (
"errors"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
)
func init() {
@@ -152,3 +154,16 @@ func TestPublisher_keepAliveAsyncPause(t *testing.T) {
pub.Pause()
wg.Wait()
}
func TestPublisher_Resume(t *testing.T) {
publisher := new(Publisher)
publisher.resumeChan = make(chan lang.PlaceholderType)
go func() {
publisher.Resume()
}()
go func() {
time.Sleep(time.Minute)
t.Fail()
}()
<-publisher.resumeChan
}

View File

@@ -13,13 +13,19 @@ type (
exclusive bool
}
// SubOption defines the method to customize a Subscriber.
SubOption func(opts *subOptions)
// A Subscriber is used to subscribe the given key on a etcd cluster.
Subscriber struct {
items *container
}
)
// NewSubscriber returns a Subscriber.
// endpoints is the hosts of the etcd cluster.
// key is the key to subscribe.
// opts are used to customize the Subscriber.
func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
var subOpts subOptions
for _, opt := range opts {
@@ -36,15 +42,17 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib
return sub, nil
}
// AddListener adds listener to s.
func (s *Subscriber) AddListener(listener func()) {
s.items.addListener(listener)
}
// Values returns all the subscription values.
func (s *Subscriber) Values() []string {
return s.items.getValues()
}
// exclusive means that key value can only be 1:1,
// Exclusive means that key value can only be 1:1,
// which means later added value will remove the keys associated with the same value previously.
func Exclusive() SubOption {
return func(opts *subOptions) {
@@ -100,9 +108,9 @@ func (c *container) addKv(key, value string) ([]string, bool) {
if early {
return previous, true
} else {
return nil, false
}
return nil, false
}
func (c *container) addListener(listener func()) {

View File

@@ -1,6 +1,7 @@
package discov
import (
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
@@ -198,3 +199,18 @@ func TestContainer(t *testing.T) {
}
}
}
func TestSubscriber(t *testing.T) {
var opt subOptions
Exclusive()(&opt)
sub := new(Subscriber)
sub.items = newContainer(opt.exclusive)
var count int32
sub.AddListener(func() {
atomic.AddInt32(&count, 1)
})
sub.items.notifyChange()
assert.Empty(t, sub.Values())
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
}

View File

@@ -2,14 +2,17 @@ package errorx
import "sync/atomic"
// AtomicError defines an atomic error.
type AtomicError struct {
err atomic.Value // error
}
// Set sets the error.
func (ae *AtomicError) Set(err error) {
ae.err.Store(err)
}
// Load returns the error.
func (ae *AtomicError) Load() error {
if v := ae.err.Load(); v != nil {
return v.(error)

View File

@@ -3,6 +3,7 @@ package errorx
import "bytes"
type (
// A BatchError is an error that can hold multiple errors.
BatchError struct {
errs errorArray
}
@@ -10,12 +11,14 @@ type (
errorArray []error
)
// Add adds err to be.
func (be *BatchError) Add(err error) {
if err != nil {
be.errs = append(be.errs, err)
}
}
// Err returns an error that represents all errors.
func (be *BatchError) Err() error {
switch len(be.errs) {
case 0:
@@ -27,10 +30,12 @@ func (be *BatchError) Err() error {
}
}
// NotNil checks if any error inside.
func (be *BatchError) NotNil() bool {
return len(be.errs) > 0
}
// Error returns a string that represents inside errors.
func (ea errorArray) Error() string {
var buf bytes.Buffer

View File

@@ -1,5 +1,6 @@
package errorx
// Chain runs funs one by one until an error occurred.
func Chain(fns ...func() error) error {
for _, fn := range fns {
if err := fn(); err != nil {

View File

@@ -8,7 +8,7 @@ import (
)
func TestChain(t *testing.T) {
var errDummy = errors.New("dummy")
errDummy := errors.New("dummy")
assert.Nil(t, Chain(func() error {
return nil
}, func() error {

View File

@@ -5,8 +5,12 @@ import "time"
const defaultBulkTasks = 1000
type (
// BulkOption defines the method to customize a BulkExecutor.
BulkOption func(options *bulkOptions)
// A BulkExecutor is an executor that can execute tasks on either requirement meets:
// 1. up to given size of tasks
// 2. flush interval time elapsed
BulkExecutor struct {
executor *PeriodicalExecutor
container *bulkContainer
@@ -18,6 +22,7 @@ type (
}
)
// NewBulkExecutor returns a BulkExecutor.
func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
options := newBulkOptions()
for _, opt := range opts {
@@ -36,25 +41,30 @@ func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
return executor
}
// Add adds task into be.
func (be *BulkExecutor) Add(task interface{}) error {
be.executor.Add(task)
return nil
}
// Flush forces be to flush and execute tasks.
func (be *BulkExecutor) Flush() {
be.executor.Flush()
}
// Wait waits be to done with the task execution.
func (be *BulkExecutor) Wait() {
be.executor.Wait()
}
// WithBulkTasks customizes a BulkExecutor with given tasks limit.
func WithBulkTasks(tasks int) BulkOption {
return func(options *bulkOptions) {
options.cachedTasks = tasks
}
}
// WithBulkInterval customizes a BulkExecutor with given flush interval.
func WithBulkInterval(duration time.Duration) BulkOption {
return func(options *bulkOptions) {
options.flushInterval = duration

View File

@@ -5,8 +5,12 @@ import "time"
const defaultChunkSize = 1024 * 1024 // 1M
type (
// ChunkOption defines the method to customize a ChunkExecutor.
ChunkOption func(options *chunkOptions)
// A ChunkExecutor is an executor to execute tasks when either requirement meets:
// 1. up to given chunk size
// 2. flush interval elapsed
ChunkExecutor struct {
executor *PeriodicalExecutor
container *chunkContainer
@@ -18,6 +22,7 @@ type (
}
)
// NewChunkExecutor returns a ChunkExecutor.
func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
options := newChunkOptions()
for _, opt := range opts {
@@ -36,6 +41,7 @@ func NewChunkExecutor(execute Execute, opts ...ChunkOption) *ChunkExecutor {
return executor
}
// Add adds task with given chunk size into ce.
func (ce *ChunkExecutor) Add(task interface{}, size int) error {
ce.executor.Add(chunk{
val: task,
@@ -44,20 +50,24 @@ func (ce *ChunkExecutor) Add(task interface{}, size int) error {
return nil
}
// Flush forces ce to flush and execute tasks.
func (ce *ChunkExecutor) Flush() {
ce.executor.Flush()
}
// Wait waits the execution to be done.
func (ce *ChunkExecutor) Wait() {
ce.executor.Wait()
}
// WithChunkBytes customizes a ChunkExecutor with the given chunk size.
func WithChunkBytes(size int) ChunkOption {
return func(options *chunkOptions) {
options.chunkSize = size
}
}
// WithFlushInterval customizes a ChunkExecutor with the given flush interval.
func WithFlushInterval(duration time.Duration) ChunkOption {
return func(options *chunkOptions) {
options.flushInterval = duration

View File

@@ -7,6 +7,7 @@ import (
"github.com/tal-tech/go-zero/core/threading"
)
// A DelayExecutor delays a tasks on given delay interval.
type DelayExecutor struct {
fn func()
delay time.Duration
@@ -14,6 +15,7 @@ type DelayExecutor struct {
lock sync.Mutex
}
// NewDelayExecutor returns a DelayExecutor with given fn and delay.
func NewDelayExecutor(fn func(), delay time.Duration) *DelayExecutor {
return &DelayExecutor{
fn: fn,
@@ -21,6 +23,7 @@ func NewDelayExecutor(fn func(), delay time.Duration) *DelayExecutor {
}
}
// Trigger triggers the task to be executed after given delay, safe to trigger more than once.
func (de *DelayExecutor) Trigger() {
de.lock.Lock()
defer de.lock.Unlock()

View File

@@ -7,11 +7,13 @@ import (
"github.com/tal-tech/go-zero/core/timex"
)
// A LessExecutor is an executor to limit execution once within given time interval.
type LessExecutor struct {
threshold time.Duration
lastTime *syncx.AtomicDuration
}
// NewLessExecutor returns a LessExecutor with given threshold as time interval.
func NewLessExecutor(threshold time.Duration) *LessExecutor {
return &LessExecutor{
threshold: threshold,
@@ -19,6 +21,8 @@ func NewLessExecutor(threshold time.Duration) *LessExecutor {
}
}
// DoOrDiscard executes or discards the task depends on if
// another task was executed within the time interval.
func (le *LessExecutor) DoOrDiscard(execute func()) bool {
now := timex.Now()
lastTime := le.lastTime.Load()

View File

@@ -16,7 +16,7 @@ import (
const idleRound = 10
type (
// A type that satisfies executors.TaskContainer can be used as the underlying
// TaskContainer interface defines a type that can be used as the underlying
// container that used to do periodical executions.
TaskContainer interface {
// AddTask adds the task into the container.
@@ -28,6 +28,7 @@ type (
RemoveAll() interface{}
}
// A PeriodicalExecutor is an executor that periodically execute tasks.
PeriodicalExecutor struct {
commander chan interface{}
interval time.Duration
@@ -43,6 +44,7 @@ type (
}
)
// NewPeriodicalExecutor returns a PeriodicalExecutor with given interval and container.
func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
// buffer 1 to let the caller go quickly
@@ -51,7 +53,7 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
return timex.NewTicker(d)
},
}
proc.AddShutdownListener(func() {
@@ -61,6 +63,7 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per
return executor
}
// Add adds tasks into pe.
func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
@@ -68,6 +71,7 @@ func (pe *PeriodicalExecutor) Add(task interface{}) {
}
}
// Flush forces pe to execute tasks.
func (pe *PeriodicalExecutor) Flush() bool {
pe.enterExecution()
return pe.executeTasks(func() interface{} {
@@ -77,12 +81,14 @@ func (pe *PeriodicalExecutor) Flush() bool {
}())
}
// Sync lets caller to run fn thread-safe with pe, especially for the underlying container.
func (pe *PeriodicalExecutor) Sync(fn func()) {
pe.lock.Lock()
defer pe.lock.Unlock()
fn()
}
// Wait waits the execution to be done.
func (pe *PeriodicalExecutor) Wait() {
pe.Flush()
pe.wgBarrier.Guard(func() {

View File

@@ -4,4 +4,5 @@ import "time"
const defaultFlushInterval = time.Second
// Execute defines the method to execute tasks.
type Execute func(tasks []interface{})

View File

@@ -7,6 +7,7 @@ import (
const bufSize = 1024
// FirstLine returns the first line of the file.
func FirstLine(filename string) (string, error) {
file, err := os.Open(filename)
if err != nil {
@@ -17,6 +18,7 @@ func FirstLine(filename string) (string, error) {
return firstLine(file)
}
// LastLine returns the last line of the file.
func LastLine(filename string) (string, error) {
file, err := os.Open(filename)
if err != nil {
@@ -69,11 +71,11 @@ func lastLine(filename string, file *os.File) (string, error) {
if buf[n-1] == '\n' {
buf = buf[:n-1]
n -= 1
n--
} else {
buf = buf[:n]
}
for n -= 1; n >= 0; n-- {
for n--; n >= 0; n-- {
if buf[n] == '\n' {
return string(append(buf[n+1:], last...)), nil
}

View File

@@ -5,12 +5,15 @@ import (
"os"
)
// OffsetRange represents a content block of a file.
type OffsetRange struct {
File string
Start int64
Stop int64
}
// SplitLineChunks splits file into chunks.
// The whole line are guaranteed to be split in the same chunk.
func SplitLineChunks(filename string, chunks int) ([]OffsetRange, error) {
info, err := os.Stat(filename)
if err != nil {

View File

@@ -3,8 +3,11 @@ package filex
import "gopkg.in/cheggaaa/pb.v1"
type (
// A Scanner is used to read lines.
Scanner interface {
// Scan checks if has remaining to read.
Scan() bool
// Text returns next line.
Text() string
}
@@ -14,6 +17,7 @@ type (
}
)
// NewProgressScanner returns a Scanner with progress indicator.
func NewProgressScanner(scanner Scanner, bar *pb.ProgressBar) Scanner {
return &progressScanner{
Scanner: scanner,

View File

@@ -5,12 +5,14 @@ import (
"os"
)
// A RangeReader is used to read a range of content from a file.
type RangeReader struct {
file *os.File
start int64
stop int64
}
// NewRangeReader returns a RangeReader, which will read the range of content from file.
func NewRangeReader(file *os.File, start, stop int64) *RangeReader {
return &RangeReader{
file: file,
@@ -19,6 +21,7 @@ func NewRangeReader(file *os.File, start, stop int64) *RangeReader {
}
}
// Read reads the range of content into p.
func (rr *RangeReader) Read(p []byte) (n int, err error) {
stat, err := rr.file.Stat()
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"syscall"
)
// CloseOnExec makes sure closing the file on process forking.
func CloseOnExec(file *os.File) {
if file != nil {
syscall.CloseOnExec(int(file.Fd()))

View File

@@ -2,6 +2,7 @@ package fx
import "github.com/tal-tech/go-zero/core/threading"
// Parallel runs fns parallelly and waits for done.
func Parallel(fns ...func()) {
group := threading.NewRoutineGroup()
for _, fn := range fns {

View File

@@ -5,6 +5,7 @@ import "github.com/tal-tech/go-zero/core/errorx"
const defaultRetryTimes = 3
type (
// RetryOption defines the method to customize DoWithRetry.
RetryOption func(*retryOptions)
retryOptions struct {
@@ -12,8 +13,9 @@ type (
}
)
func DoWithRetries(fn func() error, opts ...RetryOption) error {
var options = newRetryOptions()
// DoWithRetry runs fn, and retries if failed. Default to retry 3 times.
func DoWithRetry(fn func() error, opts ...RetryOption) error {
options := newRetryOptions()
for _, opt := range opts {
opt(options)
}
@@ -30,7 +32,8 @@ func DoWithRetries(fn func() error, opts ...RetryOption) error {
return berr.Err()
}
func WithRetries(times int) RetryOption {
// WithRetry customize a DoWithRetry call with given retry times.
func WithRetry(times int) RetryOption {
return func(options *retryOptions) {
options.times = times
}

View File

@@ -8,12 +8,12 @@ import (
)
func TestRetry(t *testing.T) {
assert.NotNil(t, DoWithRetries(func() error {
assert.NotNil(t, DoWithRetry(func() error {
return errors.New("any")
}))
var times int
assert.Nil(t, DoWithRetries(func() error {
assert.Nil(t, DoWithRetry(func() error {
times++
if times == defaultRetryTimes {
return nil
@@ -22,7 +22,7 @@ func TestRetry(t *testing.T) {
}))
times = 0
assert.NotNil(t, DoWithRetries(func() error {
assert.NotNil(t, DoWithRetry(func() error {
times++
if times == defaultRetryTimes+1 {
return nil
@@ -30,13 +30,13 @@ func TestRetry(t *testing.T) {
return errors.New("any")
}))
var total = 2 * defaultRetryTimes
total := 2 * defaultRetryTimes
times = 0
assert.Nil(t, DoWithRetries(func() error {
assert.Nil(t, DoWithRetry(func() error {
times++
if times == total {
return nil
}
return errors.New("any")
}, WithRetries(total)))
}, WithRetry(total)))
}

View File

@@ -20,23 +20,40 @@ type (
workers int
}
FilterFunc func(item interface{}) bool
ForAllFunc func(pipe <-chan interface{})
ForEachFunc func(item interface{})
// FilterFunc defines the method to filter a Stream.
FilterFunc func(item interface{}) bool
// ForAllFunc defines the method to handle all elements in a Stream.
ForAllFunc func(pipe <-chan interface{})
// ForEachFunc defines the method to handle each element in a Stream.
ForEachFunc func(item interface{})
// GenerateFunc defines the method to send elements into a Stream.
GenerateFunc func(source chan<- interface{})
KeyFunc func(item interface{}) interface{}
LessFunc func(a, b interface{}) bool
MapFunc func(item interface{}) interface{}
Option func(opts *rxOptions)
// KeyFunc defines the method to generate keys for the elements in a Stream.
KeyFunc func(item interface{}) interface{}
// LessFunc defines the method to compare the elements in a Stream.
LessFunc func(a, b interface{}) bool
// MapFunc defines the method to map each element to another object in a Stream.
MapFunc func(item interface{}) interface{}
// Option defines the method to customize a Stream.
Option func(opts *rxOptions)
// ParallelFunc defines the method to handle elements parallelly.
ParallelFunc func(item interface{})
ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
WalkFunc func(item interface{}, pipe chan<- interface{})
// ReduceFunc defines the method to reduce all the elements in a Stream.
ReduceFunc func(pipe <-chan interface{}) (interface{}, error)
// WalkFunc defines the method to walk through all the elements in a Stream.
WalkFunc func(item interface{}, pipe chan<- interface{})
// A Stream is a stream that can be used to do stream processing.
Stream struct {
source <-chan interface{}
}
)
// Concat returns a concatenated Stream.
func Concat(s Stream, others ...Stream) Stream {
return s.Concat(others...)
}
// From constructs a Stream from the given GenerateFunc.
func From(generate GenerateFunc) Stream {
source := make(chan interface{})
@@ -67,16 +84,42 @@ func Range(source <-chan interface{}) Stream {
}
}
// AllMach returns whether all elements of this stream match the provided predicate.
// May not evaluate the predicate on all elements if not necessary for determining the result.
// If the stream is empty then true is returned and the predicate is not evaluated.
func (s Stream) AllMach(predicate func(item interface{}) bool) bool {
for item := range s.source {
if !predicate(item) {
return false
}
}
return true
}
// AnyMach returns whether any elements of this stream match the provided predicate.
// May not evaluate the predicate on all elements if not necessary for determining the result.
// If the stream is empty then false is returned and the predicate is not evaluated.
func (s Stream) AnyMach(predicate func(item interface{}) bool) bool {
for item := range s.source {
if predicate(item) {
return true
}
}
return false
}
// Buffer buffers the items into a queue with size n.
// It can balance the producer and the consumer if their processing throughput don't match.
func (p Stream) Buffer(n int) Stream {
func (s Stream) Buffer(n int) Stream {
if n < 0 {
n = 0
}
source := make(chan interface{}, n)
go func() {
for item := range p.source {
for item := range s.source {
source <- item
}
close(source)
@@ -85,23 +128,51 @@ func (p Stream) Buffer(n int) Stream {
return Range(source)
}
// Concat returns a Stream that concatenated other streams
func (s Stream) Concat(others ...Stream) Stream {
source := make(chan interface{})
go func() {
group := threading.NewRoutineGroup()
group.Run(func() {
for item := range s.source {
source <- item
}
})
for _, each := range others {
each := each
group.Run(func() {
for item := range each.source {
source <- item
}
})
}
group.Wait()
close(source)
}()
return Range(source)
}
// Count counts the number of elements in the result.
func (p Stream) Count() (count int) {
for range p.source {
func (s Stream) Count() (count int) {
for range s.source {
count++
}
return
}
// Distinct removes the duplicated items base on the given KeyFunc.
func (p Stream) Distinct(fn KeyFunc) Stream {
func (s Stream) Distinct(fn KeyFunc) Stream {
source := make(chan interface{})
threading.GoSafe(func() {
defer close(source)
keys := make(map[interface{}]lang.PlaceholderType)
for item := range p.source {
for item := range s.source {
key := fn(item)
if _, ok := keys[key]; !ok {
source <- item
@@ -114,14 +185,14 @@ func (p Stream) Distinct(fn KeyFunc) Stream {
}
// Done waits all upstreaming operations to be done.
func (p Stream) Done() {
for range p.source {
func (s Stream) Done() {
for range s.source {
}
}
// Filter filters the items by the given FilterFunc.
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
return p.Walk(func(item interface{}, pipe chan<- interface{}) {
func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream {
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
if fn(item) {
pipe <- item
}
@@ -129,21 +200,21 @@ func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
}
// ForAll handles the streaming elements from the source and no later streams.
func (p Stream) ForAll(fn ForAllFunc) {
fn(p.source)
func (s Stream) ForAll(fn ForAllFunc) {
fn(s.source)
}
// ForEach seals the Stream with the ForEachFunc on each item, no successive operations.
func (p Stream) ForEach(fn ForEachFunc) {
for item := range p.source {
func (s Stream) ForEach(fn ForEachFunc) {
for item := range s.source {
fn(item)
}
}
// Group groups the elements into different groups based on their keys.
func (p Stream) Group(fn KeyFunc) Stream {
func (s Stream) Group(fn KeyFunc) Stream {
groups := make(map[interface{}][]interface{})
for item := range p.source {
for item := range s.source {
key := fn(item)
groups[key] = append(groups[key], item)
}
@@ -159,7 +230,8 @@ func (p Stream) Group(fn KeyFunc) Stream {
return Range(source)
}
func (p Stream) Head(n int64) Stream {
// Head returns the first n elements in p.
func (s Stream) Head(n int64) Stream {
if n < 1 {
panic("n must be greater than 0")
}
@@ -167,7 +239,7 @@ func (p Stream) Head(n int64) Stream {
source := make(chan interface{})
go func() {
for item := range p.source {
for item := range s.source {
n--
if n >= 0 {
source <- item
@@ -187,17 +259,17 @@ func (p Stream) Head(n int64) Stream {
return Range(source)
}
// Maps converts each item to another corresponding item, which means it's a 1:1 model.
func (p Stream) Map(fn MapFunc, opts ...Option) Stream {
return p.Walk(func(item interface{}, pipe chan<- interface{}) {
// Map converts each item to another corresponding item, which means it's a 1:1 model.
func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
pipe <- fn(item)
}, opts...)
}
// Merge merges all the items into a slice and generates a new stream.
func (p Stream) Merge() Stream {
func (s Stream) Merge() Stream {
var items []interface{}
for item := range p.source {
for item := range s.source {
items = append(items, item)
}
@@ -209,21 +281,21 @@ func (p Stream) Merge() Stream {
}
// Parallel applies the given ParallelFunc to each item concurrently with given number of workers.
func (p Stream) Parallel(fn ParallelFunc, opts ...Option) {
p.Walk(func(item interface{}, pipe chan<- interface{}) {
func (s Stream) Parallel(fn ParallelFunc, opts ...Option) {
s.Walk(func(item interface{}, pipe chan<- interface{}) {
fn(item)
}, opts...).Done()
}
// Reduce is a utility method to let the caller deal with the underlying channel.
func (p Stream) Reduce(fn ReduceFunc) (interface{}, error) {
return fn(p.source)
func (s Stream) Reduce(fn ReduceFunc) (interface{}, error) {
return fn(s.source)
}
// Reverse reverses the elements in the stream.
func (p Stream) Reverse() Stream {
func (s Stream) Reverse() Stream {
var items []interface{}
for item := range p.source {
for item := range s.source {
items = append(items, item)
}
// reverse, official method
@@ -235,10 +307,36 @@ func (p Stream) Reverse() Stream {
return Just(items...)
}
// Skip returns a Stream that skips size elements.
func (s Stream) Skip(n int64) Stream {
if n < 0 {
panic("n must not be negative")
}
if n == 0 {
return s
}
source := make(chan interface{})
go func() {
for item := range s.source {
n--
if n >= 0 {
continue
} else {
source <- item
}
}
close(source)
}()
return Range(source)
}
// Sort sorts the items from the underlying source.
func (p Stream) Sort(less LessFunc) Stream {
func (s Stream) Sort(less LessFunc) Stream {
var items []interface{}
for item := range p.source {
for item := range s.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
@@ -250,7 +348,7 @@ func (p Stream) Sort(less LessFunc) Stream {
// Split splits the elements into chunk with size up to n,
// might be less than n on tailing elements.
func (p Stream) Split(n int) Stream {
func (s Stream) Split(n int) Stream {
if n < 1 {
panic("n should be greater than 0")
}
@@ -258,7 +356,7 @@ func (p Stream) Split(n int) Stream {
source := make(chan interface{})
go func() {
var chunk []interface{}
for item := range p.source {
for item := range s.source {
chunk = append(chunk, item)
if len(chunk) == n {
source <- chunk
@@ -274,7 +372,8 @@ func (p Stream) Split(n int) Stream {
return Range(source)
}
func (p Stream) Tail(n int64) Stream {
// Tail returns the last n elements in p.
func (s Stream) Tail(n int64) Stream {
if n < 1 {
panic("n should be greater than 0")
}
@@ -283,7 +382,7 @@ func (p Stream) Tail(n int64) Stream {
go func() {
ring := collection.NewRing(int(n))
for item := range p.source {
for item := range s.source {
ring.Add(item)
}
for _, item := range ring.Take() {
@@ -296,16 +395,16 @@ func (p Stream) Tail(n int64) Stream {
}
// Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.
func (p Stream) Walk(fn WalkFunc, opts ...Option) Stream {
func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream {
option := buildOptions(opts...)
if option.unlimitedWorkers {
return p.walkUnlimited(fn, option)
} else {
return p.walkLimited(fn, option)
return s.walkUnlimited(fn, option)
}
return s.walkLimited(fn, option)
}
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
pipe := make(chan interface{}, option.workers)
go func() {
@@ -314,7 +413,7 @@ func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
for {
pool <- lang.Placeholder
item, ok := <-p.source
item, ok := <-s.source
if !ok {
<-pool
break
@@ -339,14 +438,14 @@ func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
return Range(pipe)
}
func (p Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream {
pipe := make(chan interface{}, defaultWorkers)
go func() {
var wg sync.WaitGroup
for {
item, ok := <-p.source
item, ok := <-s.source
if !ok {
break
}

View File

@@ -3,7 +3,10 @@ package fx
import (
"io/ioutil"
"log"
"math/rand"
"reflect"
"runtime"
"sort"
"sync"
"sync/atomic"
"testing"
@@ -330,6 +333,29 @@ func TestWalk(t *testing.T) {
assert.Equal(t, 9, result)
}
func BenchmarkParallelMapReduce(b *testing.B) {
b.ReportAllocs()
mapper := func(v interface{}) interface{} {
return v.(int64) * v.(int64)
}
reducer := func(input <-chan interface{}) (interface{}, error) {
var result int64
for v := range input {
result += v.(int64)
}
return result, nil
}
b.ResetTimer()
From(func(input chan<- interface{}) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
input <- int64(rand.Int())
}
})
}).Map(mapper).Reduce(reducer)
}
func BenchmarkMapReduce(b *testing.B) {
b.ReportAllocs()
@@ -343,12 +369,103 @@ func BenchmarkMapReduce(b *testing.B) {
}
return result, nil
}
b.ResetTimer()
From(func(input chan<- interface{}) {
for i := 0; i < b.N; i++ {
input <- int64(rand.Int())
}
}).Map(mapper).Reduce(reducer)
}
for i := 0; i < b.N; i++ {
From(func(input chan<- interface{}) {
for j := 0; j < 2; j++ {
input <- int64(j)
}
}).Map(mapper).Reduce(reducer)
func equal(t *testing.T, stream Stream, data []interface{}) {
items := make([]interface{}, 0)
for item := range stream.source {
items = append(items, item)
}
if !reflect.DeepEqual(items, data) {
t.Errorf(" %v, want %v", items, data)
}
}
func assetEqual(t *testing.T, except, data interface{}) {
if !reflect.DeepEqual(except, data) {
t.Errorf(" %v, want %v", data, except)
}
}
func TestStream_AnyMach(t *testing.T) {
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 4 == item.(int)
}))
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 0 == item.(int)
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 2 == item.(int)
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 2 == item.(int)
}))
}
func TestStream_AllMach(t *testing.T) {
assetEqual(
t, true, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return true
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return false
}),
)
assetEqual(
t, false, Just(1, 2, 3).AllMach(func(item interface{}) bool {
return item.(int) == 1
}),
)
}
func TestConcat(t *testing.T) {
a1 := []interface{}{1, 2, 3}
a2 := []interface{}{4, 5, 6}
s1 := Just(a1...)
s2 := Just(a2...)
stream := Concat(s1, s2)
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
ints := make([]interface{}, 0)
ints = append(ints, a1...)
ints = append(ints, a2...)
assetEqual(t, ints, items)
}
func TestStream_Skip(t *testing.T) {
assetEqual(t, 3, Just(1, 2, 3, 4).Skip(1).Count())
assetEqual(t, 1, Just(1, 2, 3, 4).Skip(3).Count())
assetEqual(t, 4, Just(1, 2, 3, 4).Skip(0).Count())
equal(t, Just(1, 2, 3, 4).Skip(3), []interface{}{4})
assert.Panics(t, func() {
Just(1, 2, 3, 4).Skip(-1)
})
}
func TestStream_Concat(t *testing.T) {
stream := Just(1).Concat(Just(2), Just(3))
var items []interface{}
for item := range stream.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return items[i].(int) < items[j].(int)
})
assetEqual(t, []interface{}{1, 2, 3}, items)
just := Just(1)
equal(t, just.Concat(just), []interface{}{1})
}

View File

@@ -6,13 +6,17 @@ import (
)
var (
// ErrCanceled is the error returned when the context is canceled.
ErrCanceled = context.Canceled
ErrTimeout = context.DeadlineExceeded
// ErrTimeout is the error returned when the context's deadline passes.
ErrTimeout = context.DeadlineExceeded
)
type FxOption func() context.Context
// DoOption defines the method to customize a DoWithTimeout call.
type DoOption func() context.Context
func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) error {
// DoWithTimeout runs fn with timeout control.
func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error {
parentCtx := context.Background()
for _, opt := range opts {
parentCtx = opt()
@@ -20,7 +24,8 @@ func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) err
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()
done := make(chan error)
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
go func() {
defer func() {
@@ -29,7 +34,6 @@ func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) err
}
}()
done <- fn()
close(done)
}()
select {
@@ -42,7 +46,8 @@ func DoWithTimeout(fn func() error, timeout time.Duration, opts ...FxOption) err
}
}
func WithContext(ctx context.Context) FxOption {
// WithContext customizes a DoWithTimeout call with given ctx.
func WithContext(ctx context.Context) DoOption {
return func() context.Context {
return ctx
}

View File

@@ -11,6 +11,7 @@ import (
)
const (
// TopWeight is the top weight that one entry might set.
TopWeight = 100
minReplicas = 100
@@ -18,10 +19,12 @@ const (
)
type (
HashFunc func(data []byte) uint64
// Func defines the hash method.
Func func(data []byte) uint64
// A ConsistentHash is a ring hash implementation.
ConsistentHash struct {
hashFunc HashFunc
hashFunc Func
replicas int
keys []uint64
ring map[uint64][]interface{}
@@ -30,11 +33,13 @@ type (
}
)
// NewConsistentHash returns a ConsistentHash.
func NewConsistentHash() *ConsistentHash {
return NewCustomConsistentHash(minReplicas, Hash)
}
func NewCustomConsistentHash(replicas int, fn HashFunc) *ConsistentHash {
// NewCustomConsistentHash returns a ConsistentHash with given replicas and hash func.
func NewCustomConsistentHash(replicas int, fn Func) *ConsistentHash {
if replicas < minReplicas {
replicas = minReplicas
}
@@ -78,7 +83,7 @@ func (h *ConsistentHash) AddWithReplicas(node interface{}, replicas int) {
h.ring[hash] = append(h.ring[hash], node)
}
sort.Slice(h.keys, func(i int, j int) bool {
sort.Slice(h.keys, func(i, j int) bool {
return h.keys[i] < h.keys[j]
})
}
@@ -92,6 +97,7 @@ func (h *ConsistentHash) AddWithWeight(node interface{}, weight int) {
h.AddWithReplicas(node, replicas)
}
// Get returns the corresponding node from h base on the given v.
func (h *ConsistentHash) Get(v interface{}) (interface{}, bool) {
h.lock.RLock()
defer h.lock.RUnlock()
@@ -118,6 +124,7 @@ func (h *ConsistentHash) Get(v interface{}) (interface{}, bool) {
}
}
// Remove removes the given node from h.
func (h *ConsistentHash) Remove(node interface{}) {
nodeRepr := repr(node)
@@ -133,7 +140,7 @@ func (h *ConsistentHash) Remove(node interface{}) {
index := sort.Search(len(h.keys), func(i int) bool {
return h.keys[i] >= hash
})
if index < len(h.keys) {
if index < len(h.keys) && h.keys[index] == hash {
h.keys = append(h.keys[:index], h.keys[index+1:]...)
}
h.removeRingNode(hash, nodeRepr)

View File

@@ -132,8 +132,8 @@ func TestConsistentHash_RemoveInterface(t *testing.T) {
assert.Equal(t, 1, len(ch.nodes))
node, ok := ch.Get(1)
assert.True(t, ok)
assert.Equal(t, key, node.(*MockNode).Addr)
assert.Equal(t, 2, node.(*MockNode).Id)
assert.Equal(t, key, node.(*mockNode).addr)
assert.Equal(t, 2, node.(*mockNode).id)
}
func getKeysBeforeAndAfterFailure(t *testing.T, prefix string, index int) (map[int]string, map[int]string) {
@@ -164,18 +164,18 @@ func getKeysBeforeAndAfterFailure(t *testing.T, prefix string, index int) (map[i
return keys, newKeys
}
type MockNode struct {
Addr string
Id int
type mockNode struct {
addr string
id int
}
func newMockNode(addr string, id int) *MockNode {
return &MockNode{
Addr: addr,
Id: id,
func newMockNode(addr string, id int) *mockNode {
return &mockNode{
addr: addr,
id: id,
}
}
func (n *MockNode) String() string {
return n.Addr
func (n *mockNode) String() string {
return n.addr
}

View File

@@ -7,16 +7,19 @@ import (
"github.com/spaolacci/murmur3"
)
// Hash returns the hash value of data.
func Hash(data []byte) uint64 {
return murmur3.Sum64(data)
}
// Md5 returns the md5 bytes of data.
func Md5(data []byte) []byte {
digest := md5.New()
digest.Write(data)
return digest.Sum(nil)
}
// Md5Hex returns the md5 hex string of data.
func Md5Hex(data []byte) string {
return fmt.Sprintf("%x", Md5(data))
}

View File

@@ -5,11 +5,13 @@ import (
"sync"
)
// A BufferPool is a pool to buffer bytes.Buffer objects.
type BufferPool struct {
capability int
pool *sync.Pool
}
// NewBufferPool returns a BufferPool.
func NewBufferPool(capability int) *BufferPool {
return &BufferPool{
capability: capability,
@@ -21,12 +23,14 @@ func NewBufferPool(capability int) *BufferPool {
}
}
// Get returns a bytes.Buffer object from bp.
func (bp *BufferPool) Get() *bytes.Buffer {
buf := bp.pool.Get().(*bytes.Buffer)
buf.Reset()
return buf
}
// Put returns buf into bp.
func (bp *BufferPool) Put(buf *bytes.Buffer) {
if buf.Cap() < bp.capability {
bp.pool.Put(buf)

View File

@@ -10,6 +10,7 @@ func (nopCloser) Close() error {
return nil
}
// NopCloser returns a io.WriteCloser that does nothing on calling Close.
func NopCloser(w io.Writer) io.WriteCloser {
return nopCloser{w}
}

View File

@@ -16,9 +16,11 @@ type (
omitPrefix string
}
// TextReadOption defines the method to customize the text reading functions.
TextReadOption func(*textReadOptions)
)
// DupReadCloser returns two io.ReadCloser that read from the first will be written to the second.
// The first returned reader needs to be read first, because the content
// read from it will be written to the underlying buffer of the second reader.
func DupReadCloser(reader io.ReadCloser) (io.ReadCloser, io.ReadCloser) {
@@ -27,6 +29,7 @@ func DupReadCloser(reader io.ReadCloser) (io.ReadCloser, io.ReadCloser) {
return ioutil.NopCloser(tee), ioutil.NopCloser(&buf)
}
// KeepSpace customizes the reading functions to keep leading and tailing spaces.
func KeepSpace() TextReadOption {
return func(o *textReadOptions) {
o.keepSpace = true
@@ -49,6 +52,7 @@ func ReadBytes(reader io.Reader, buf []byte) error {
return nil
}
// ReadText reads content from the given file with leading and tailing spaces trimmed.
func ReadText(filename string) (string, error) {
content, err := ioutil.ReadFile(filename)
if err != nil {
@@ -58,6 +62,7 @@ func ReadText(filename string) (string, error) {
return strings.TrimSpace(string(content)), nil
}
// ReadTextLines reads the text lines from given file.
func ReadTextLines(filename string, opts ...TextReadOption) ([]string, error) {
var readOpts textReadOptions
for _, opt := range opts {
@@ -90,12 +95,14 @@ func ReadTextLines(filename string, opts ...TextReadOption) ([]string, error) {
return lines, scanner.Err()
}
// WithoutBlank customizes the reading functions to ignore blank lines.
func WithoutBlank() TextReadOption {
return func(o *textReadOptions) {
o.withoutBlanks = true
}
}
// OmitWithPrefix customizes the reading functions to ignore the lines with given leading prefix.
func OmitWithPrefix(prefix string) TextReadOption {
return func(o *textReadOptions) {
o.omitPrefix = prefix

View File

@@ -8,6 +8,7 @@ import (
const bufSize = 32 * 1024
// CountLines returns the number of lines in file.
func CountLines(file string) (int, error) {
f, err := os.Open(file)
if err != nil {

View File

@@ -6,6 +6,7 @@ import (
"strings"
)
// A TextLineScanner is a scanner that can scan lines from given reader.
type TextLineScanner struct {
reader *bufio.Reader
hasNext bool
@@ -13,6 +14,7 @@ type TextLineScanner struct {
err error
}
// NewTextLineScanner returns a TextLineScanner with given reader.
func NewTextLineScanner(reader io.Reader) *TextLineScanner {
return &TextLineScanner{
reader: bufio.NewReader(reader),
@@ -20,6 +22,7 @@ func NewTextLineScanner(reader io.Reader) *TextLineScanner {
}
}
// Scan checks if scanner has more lines to read.
func (scanner *TextLineScanner) Scan() bool {
if !scanner.hasNext {
return false
@@ -37,6 +40,7 @@ func (scanner *TextLineScanner) Scan() bool {
return true
}
// Line returns the next available line.
func (scanner *TextLineScanner) Line() (string, error) {
return scanner.line, scanner.err
}

View File

@@ -7,32 +7,38 @@ import (
"github.com/globalsign/mgo/bson"
)
// MilliTime represents time.Time that works better with mongodb.
type MilliTime struct {
time.Time
}
// MarshalJSON marshals mt to json bytes.
func (mt MilliTime) MarshalJSON() ([]byte, error) {
return json.Marshal(mt.Milli())
}
// UnmarshalJSON unmarshals data into mt.
func (mt *MilliTime) UnmarshalJSON(data []byte) error {
var milli int64
if err := json.Unmarshal(data, &milli); err != nil {
return err
} else {
mt.Time = time.Unix(0, milli*int64(time.Millisecond))
return nil
}
mt.Time = time.Unix(0, milli*int64(time.Millisecond))
return nil
}
// GetBSON returns BSON base on mt.
func (mt MilliTime) GetBSON() (interface{}, error) {
return mt.Time, nil
}
// SetBSON sets raw into mt.
func (mt *MilliTime) SetBSON(raw bson.Raw) error {
return raw.Unmarshal(&mt.Time)
}
// Milli returns milliseconds for mt.
func (mt MilliTime) Milli() int64 {
return mt.UnixNano() / int64(time.Millisecond)
}

View File

@@ -8,10 +8,12 @@ import (
"strings"
)
// Marshal marshals v into json bytes.
func Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
// Unmarshal unmarshals data bytes into v.
func Unmarshal(data []byte, v interface{}) error {
decoder := json.NewDecoder(bytes.NewReader(data))
if err := unmarshalUseNumber(decoder, v); err != nil {
@@ -21,6 +23,7 @@ func Unmarshal(data []byte, v interface{}) error {
return nil
}
// UnmarshalFromString unmarshals v from str.
func UnmarshalFromString(str string, v interface{}) error {
decoder := json.NewDecoder(strings.NewReader(str))
if err := unmarshalUseNumber(decoder, v); err != nil {
@@ -30,6 +33,7 @@ func UnmarshalFromString(str string, v interface{}) error {
return nil
}
// UnmarshalFromReader unmarshals v from reader.
func UnmarshalFromReader(reader io.Reader, v interface{}) error {
var buf strings.Builder
teeReader := io.TeeReader(reader, &buf)

View File

@@ -1,8 +1,11 @@
package lang
// Placeholder is a placeholder object that can be used globally.
var Placeholder PlaceholderType
type (
GenericType = interface{}
// GenericType can be used to hold any type.
GenericType = interface{}
// PlaceholderType represents a placeholder type.
PlaceholderType = struct{}
)

View File

@@ -27,9 +27,13 @@ end`
)
const (
// Unknown means not initialized state.
Unknown = iota
// Allowed means allowed state.
Allowed
// HitQuota means this request exactly hit the quota.
HitQuota
// OverQuota means passed the quota.
OverQuota
internalOverQuota = 0
@@ -37,11 +41,14 @@ const (
internalHitQuota = 2
)
// ErrUnknownCode is an error that represents unknown status code.
var ErrUnknownCode = errors.New("unknown status code")
type (
LimitOption func(l *PeriodLimit)
// PeriodOption defines the method to customize a PeriodLimit.
PeriodOption func(l *PeriodLimit)
// A PeriodLimit is used to limit requests during a period of time.
PeriodLimit struct {
period int
quota int
@@ -51,8 +58,9 @@ type (
}
)
// NewPeriodLimit returns a PeriodLimit with given parameters.
func NewPeriodLimit(period, quota int, limitStore *redis.Redis, keyPrefix string,
opts ...LimitOption) *PeriodLimit {
opts ...PeriodOption) *PeriodLimit {
limiter := &PeriodLimit{
period: period,
quota: quota,
@@ -67,6 +75,7 @@ func NewPeriodLimit(period, quota int, limitStore *redis.Redis, keyPrefix string
return limiter
}
// Take requests a permit, it returns the permit state.
func (h *PeriodLimit) Take(key string) (int, error) {
resp, err := h.limitStore.Eval(periodScript, []string{h.keyPrefix + key}, []string{
strconv.Itoa(h.quota),
@@ -97,12 +106,13 @@ func (h *PeriodLimit) calcExpireSeconds() int {
if h.align {
unix := time.Now().Unix() + zoneDiff
return h.period - int(unix%int64(h.period))
} else {
return h.period
}
return h.period
}
func Align() LimitOption {
// Align returns a func to customize a PeriodLimit with alignment.
func Align() PeriodOption {
return func(l *PeriodLimit) {
l.align = true
}

View File

@@ -33,7 +33,7 @@ func TestPeriodLimit_RedisUnavailable(t *testing.T) {
assert.Equal(t, 0, val)
}
func testPeriodLimit(t *testing.T, opts ...LimitOption) {
func testPeriodLimit(t *testing.T, opts ...PeriodOption) {
store, clean, err := redistest.CreateRedis()
assert.Nil(t, err)
defer clean()

View File

@@ -26,6 +26,7 @@ const (
)
var (
// ErrServiceOverloaded is returned by Shedder.Allow when the service is overloaded.
ErrServiceOverloaded = errors.New("service overloaded")
// default to be enabled
@@ -37,15 +38,22 @@ var (
)
type (
// A Promise interface is returned by Shedder.Allow to let callers tell
// whether the processing request is successful or not.
Promise interface {
// Pass lets the caller tell that the call is successful.
Pass()
// Fail lets the caller tell that the call is failed.
Fail()
}
// Shedder is the interface that wraps the Allow method.
Shedder interface {
// Allow returns the Promise if allowed, otherwise ErrServiceOverloaded.
Allow() (Promise, error)
}
// ShedderOption lets caller customize the Shedder.
ShedderOption func(opts *shedderOptions)
shedderOptions struct {
@@ -67,10 +75,13 @@ type (
}
)
// Disable lets callers disable load shedding.
func Disable() {
enabled.Set(false)
}
// NewAdaptiveShedder returns an adaptive shedder.
// opts can be used to customize the Shedder.
func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
if !enabled.True() {
return newNopShedder()
@@ -97,6 +108,7 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
}
}
// Allow implements Shedder.Allow.
func (as *adaptiveShedder) Allow() (Promise, error) {
if as.shouldDrop() {
as.dropTime.Set(timex.Now())
@@ -156,7 +168,7 @@ func (as *adaptiveShedder) maxPass() int64 {
}
func (as *adaptiveShedder) minRt() float64 {
var result = defaultMinRt
result := defaultMinRt
as.rtCounter.Reduce(func(b *collection.Bucket) {
if b.Count <= 0 {
@@ -213,18 +225,21 @@ func (as *adaptiveShedder) systemOverloaded() bool {
return systemOverloadChecker(as.cpuThreshold)
}
// WithBuckets customizes the Shedder with given number of buckets.
func WithBuckets(buckets int) ShedderOption {
return func(opts *shedderOptions) {
opts.buckets = buckets
}
}
// WithCpuThreshold customizes the Shedder with given cpu threshold.
func WithCpuThreshold(threshold int64) ShedderOption {
return func(opts *shedderOptions) {
opts.cpuThreshold = threshold
}
}
// WithWindow customizes the Shedder with given
func WithWindow(window time.Duration) ShedderOption {
return func(opts *shedderOptions) {
opts.window = window

View File

@@ -201,7 +201,7 @@ func BenchmarkAdaptiveShedder_Allow(b *testing.B) {
logx.Disable()
bench := func(b *testing.B) {
var shedder = NewAdaptiveShedder()
shedder := NewAdaptiveShedder()
proba := mathx.NewProba()
for i := 0; i < 6000; i++ {
p, err := shedder.Allow()

View File

@@ -1,7 +1,6 @@
package load
type nopShedder struct {
}
type nopShedder struct{}
func newNopShedder() Shedder {
return nopShedder{}
@@ -11,8 +10,7 @@ func (s nopShedder) Allow() (Promise, error) {
return nopPromise{}, nil
}
type nopPromise struct {
}
type nopPromise struct{}
func (p nopPromise) Pass() {
}

View File

@@ -6,11 +6,13 @@ import (
"github.com/tal-tech/go-zero/core/syncx"
)
// A ShedderGroup is a manager to manage key based shedders.
type ShedderGroup struct {
options []ShedderOption
manager *syncx.ResourceManager
}
// NewShedderGroup returns a ShedderGroup.
func NewShedderGroup(opts ...ShedderOption) *ShedderGroup {
return &ShedderGroup{
options: opts,
@@ -18,6 +20,7 @@ func NewShedderGroup(opts ...ShedderOption) *ShedderGroup {
}
}
// GetShedder gets the Shedder for the given key.
func (g *ShedderGroup) GetShedder(key string) Shedder {
shedder, _ := g.manager.GetResource(key, func() (closer io.Closer, e error) {
return nopCloser{

View File

@@ -9,6 +9,7 @@ import (
)
type (
// A SheddingStat is used to store the statistics for load shedding.
SheddingStat struct {
name string
total int64
@@ -23,6 +24,7 @@ type (
}
)
// NewSheddingStat returns a SheddingStat.
func NewSheddingStat(name string) *SheddingStat {
st := &SheddingStat{
name: name,
@@ -31,14 +33,17 @@ func NewSheddingStat(name string) *SheddingStat {
return st
}
// IncrementTotal increments the total requests.
func (s *SheddingStat) IncrementTotal() {
atomic.AddInt64(&s.total, 1)
}
// IncrementPass increments the passed requests.
func (s *SheddingStat) IncrementPass() {
atomic.AddInt64(&s.pass, 1)
}
// IncrementDrop increments the dropped requests.
func (s *SheddingStat) IncrementDrop() {
atomic.AddInt64(&s.drop, 1)
}

View File

@@ -1,8 +1,10 @@
package logx
// A LogConf is a logging config.
type LogConf struct {
ServiceName string `json:",optional"`
Mode string `json:",default=console,options=console|file|volume"`
TimeFormat string `json:",optional"`
Path string `json:",default=logs"`
Level string `json:",default=info,options=info|error|severe"`
Compress bool `json:",optional"`

View File

@@ -12,6 +12,7 @@ const durationCallerDepth = 3
type durationLogger logEntry
// WithDuration returns a Logger which logs the given duration.
func WithDuration(d time.Duration) Logger {
return &durationLogger{
Duration: timex.ReprOfDuration(d),

View File

@@ -1,21 +1,25 @@
package logx
// A LessLogger is a logger that control to log once during the given duration.
type LessLogger struct {
*limitedExecutor
}
// NewLessLogger returns a LessLogger.
func NewLessLogger(milliseconds int) *LessLogger {
return &LessLogger{
limitedExecutor: newLimitedExecutor(milliseconds),
}
}
// Error logs v into error log or discard it if more than once in the given duration.
func (logger *LessLogger) Error(v ...interface{}) {
logger.logOrDiscard(func() {
Error(v...)
})
}
// Errorf logs v with format into error log or discard it if more than once in the given duration.
func (logger *LessLogger) Errorf(format string, v ...interface{}) {
logger.logOrDiscard(func() {
Errorf(format, v...)

View File

@@ -7,7 +7,7 @@ type lessWriter struct {
writer io.Writer
}
func NewLessWriter(writer io.Writer, milliseconds int) *lessWriter {
func newLessWriter(writer io.Writer, milliseconds int) *lessWriter {
return &lessWriter{
limitedExecutor: newLimitedExecutor(milliseconds),
writer: writer,

View File

@@ -9,7 +9,7 @@ import (
func TestLessWriter(t *testing.T) {
var builder strings.Builder
w := NewLessWriter(&builder, 500)
w := newLessWriter(&builder, 500)
for i := 0; i < 100; i++ {
_, err := w.Write([]byte("hello"))
assert.Nil(t, err)

View File

@@ -32,8 +32,6 @@ const (
)
const (
timeFormat = "2006-01-02T15:04:05.000Z07"
accessFilename = "access.log"
errorFilename = "error.log"
severeFilename = "severe.log"
@@ -57,10 +55,14 @@ const (
)
var (
ErrLogPathNotSet = errors.New("log path must be set")
ErrLogNotInitialized = errors.New("log not initialized")
// ErrLogPathNotSet is an error that indicates the log path is not set.
ErrLogPathNotSet = errors.New("log path must be set")
// ErrLogNotInitialized is an error that log is not initialized.
ErrLogNotInitialized = errors.New("log not initialized")
// ErrLogServiceNameNotSet is an error that indicates that the service name is not set.
ErrLogServiceNameNotSet = errors.New("log service name must be set")
timeFormat = "2006-01-02T15:04:05.000Z07"
writeConsole bool
logLevel uint32
infoLog io.WriteCloser
@@ -89,8 +91,10 @@ type (
keepDays int
}
// LogOption defines the method to customize the logging.
LogOption func(options *logOptions)
// A Logger represents a logger.
Logger interface {
Error(...interface{})
Errorf(string, ...interface{})
@@ -102,6 +106,7 @@ type (
}
)
// MustSetup sets up logging with given config c. It exits on error.
func MustSetup(c LogConf) {
Must(SetUp(c))
}
@@ -111,6 +116,10 @@ func MustSetup(c LogConf) {
// we need to allow different service frameworks to initialize logx respectively.
// the same logic for SetUp
func SetUp(c LogConf) error {
if len(c.TimeFormat) > 0 {
timeFormat = c.TimeFormat
}
switch c.Mode {
case consoleMode:
setupWithConsole(c)
@@ -122,10 +131,12 @@ func SetUp(c LogConf) error {
}
}
// Alert alerts v in alert level, and the message is written to error log.
func Alert(v string) {
output(errorLog, levelAlert, v)
}
// Close closes the logging.
func Close() error {
if writeConsole {
return nil
@@ -170,6 +181,7 @@ func Close() error {
return nil
}
// Disable disables the logging.
func Disable() {
once.Do(func() {
atomic.StoreUint32(&initialized, 1)
@@ -183,40 +195,49 @@ func Disable() {
})
}
// Error writes v into error log.
func Error(v ...interface{}) {
ErrorCaller(1, v...)
}
// Errorf writes v with format into error log.
func Errorf(format string, v ...interface{}) {
ErrorCallerf(1, format, v...)
}
// ErrorCaller writes v with context into error log.
func ErrorCaller(callDepth int, v ...interface{}) {
errorSync(fmt.Sprint(v...), callDepth+callerInnerDepth)
}
// ErrorCallerf writes v with context in format into error log.
func ErrorCallerf(callDepth int, format string, v ...interface{}) {
errorSync(fmt.Sprintf(format, v...), callDepth+callerInnerDepth)
}
// ErrorStack writes v along with call stack into error log.
func ErrorStack(v ...interface{}) {
// there is newline in stack string
stackSync(fmt.Sprint(v...))
}
// ErrorStackf writes v along with call stack in format into error log.
func ErrorStackf(format string, v ...interface{}) {
// there is newline in stack string
stackSync(fmt.Sprintf(format, v...))
}
// Info writes v into access log.
func Info(v ...interface{}) {
infoSync(fmt.Sprint(v...))
}
// Infof writes v with format into access log.
func Infof(format string, v ...interface{}) {
infoSync(fmt.Sprintf(format, v...))
}
// Must checks if err is nil, otherwise logs the err and exits.
func Must(err error) {
if err != nil {
msg := formatWithCaller(err.Error(), 3)
@@ -226,46 +247,56 @@ func Must(err error) {
}
}
// SetLevel sets the logging level. It can be used to suppress some logs.
func SetLevel(level uint32) {
atomic.StoreUint32(&logLevel, level)
}
// Severe writes v into severe log.
func Severe(v ...interface{}) {
severeSync(fmt.Sprint(v...))
}
// Severef writes v with format into severe log.
func Severef(format string, v ...interface{}) {
severeSync(fmt.Sprintf(format, v...))
}
// Slow writes v into slow log.
func Slow(v ...interface{}) {
slowSync(fmt.Sprint(v...))
}
// Slowf writes v with format into slow log.
func Slowf(format string, v ...interface{}) {
slowSync(fmt.Sprintf(format, v...))
}
// Stat writes v into stat log.
func Stat(v ...interface{}) {
statSync(fmt.Sprint(v...))
}
// Statf writes v with format into stat log.
func Statf(format string, v ...interface{}) {
statSync(fmt.Sprintf(format, v...))
}
// WithCooldownMillis customizes logging on writing call stack interval.
func WithCooldownMillis(millis int) LogOption {
return func(opts *logOptions) {
opts.logStackCooldownMills = millis
}
}
// WithKeepDays customizes logging to keep logs with days.
func WithKeepDays(days int) LogOption {
return func(opts *logOptions) {
opts.keepDays = days
}
}
// WithGzip customizes logging to automatically gzip the log files.
func WithGzip() LogOption {
return func(opts *logOptions) {
opts.gzipEnabled = true
@@ -382,7 +413,7 @@ func setupWithConsole(c LogConf) {
errorLog = newLogWriter(log.New(os.Stderr, "", flags))
severeLog = newLogWriter(log.New(os.Stderr, "", flags))
slowLog = newLogWriter(log.New(os.Stderr, "", flags))
stackLog = NewLessWriter(errorLog, options.logStackCooldownMills)
stackLog = newLessWriter(errorLog, options.logStackCooldownMills)
statLog = infoLog
})
}
@@ -434,7 +465,7 @@ func setupWithFiles(c LogConf) error {
return
}
stackLog = NewLessWriter(errorLog, options.logStackCooldownMills)
stackLog = newLessWriter(errorLog, options.logStackCooldownMills)
})
return err

View File

@@ -22,13 +22,15 @@ const (
dateFormat = "2006-01-02"
hoursPerDay = 24
bufferSize = 100
defaultDirMode = 0755
defaultFileMode = 0600
defaultDirMode = 0o755
defaultFileMode = 0o600
)
// ErrLogFileClosed is an error that indicates the log file is already closed.
var ErrLogFileClosed = errors.New("error: log file closed")
type (
// A RotateRule interface is used to define the log rotating rules.
RotateRule interface {
BackupFileName() string
MarkRotated()
@@ -36,6 +38,7 @@ type (
ShallRotate() bool
}
// A RotateLogger is a Logger that can rotate log files with given rules.
RotateLogger struct {
filename string
backup string
@@ -50,6 +53,7 @@ type (
closeOnce sync.Once
}
// A DailyRotateRule is a rule to daily rotate the log files.
DailyRotateRule struct {
rotatedTime string
filename string
@@ -59,6 +63,7 @@ type (
}
)
// DefaultRotateRule is a default log rotating rule, currently DailyRotateRule.
func DefaultRotateRule(filename, delimiter string, days int, gzip bool) RotateRule {
return &DailyRotateRule{
rotatedTime: getNowDate(),
@@ -69,14 +74,17 @@ func DefaultRotateRule(filename, delimiter string, days int, gzip bool) RotateRu
}
}
// BackupFileName returns the backup filename on rotating.
func (r *DailyRotateRule) BackupFileName() string {
return fmt.Sprintf("%s%s%s", r.filename, r.delimiter, getNowDate())
}
// MarkRotated marks the rotated time of r to be the current time.
func (r *DailyRotateRule) MarkRotated() {
r.rotatedTime = getNowDate()
}
// OutdatedFiles returns the files that exceeded the keeping days.
func (r *DailyRotateRule) OutdatedFiles() []string {
if r.days <= 0 {
return nil
@@ -113,10 +121,12 @@ func (r *DailyRotateRule) OutdatedFiles() []string {
return outdates
}
// ShallRotate checks if the file should be rotated.
func (r *DailyRotateRule) ShallRotate() bool {
return len(r.rotatedTime) > 0 && getNowDate() != r.rotatedTime
}
// NewLogger returns a RotateLogger with given filename and rule, etc.
func NewLogger(filename string, rule RotateRule, compress bool) (*RotateLogger, error) {
l := &RotateLogger{
filename: filename,
@@ -133,6 +143,7 @@ func NewLogger(filename string, rule RotateRule, compress bool) (*RotateLogger,
return l, nil
}
// Close closes l.
func (l *RotateLogger) Close() error {
var err error
@@ -163,9 +174,9 @@ func (l *RotateLogger) Write(data []byte) (int, error) {
func (l *RotateLogger) getBackupFilename() string {
if len(l.backup) == 0 {
return l.rule.BackupFileName()
} else {
return l.backup
}
return l.backup
}
func (l *RotateLogger) init() error {

View File

@@ -67,6 +67,7 @@ func (l *traceLogger) write(writer io.Writer, level, content string) {
outputJson(writer, l)
}
// WithContext sets ctx to log, for keeping tracing information.
func WithContext(ctx context.Context) Logger {
return &traceLogger{
ctx: ctx,

View File

@@ -13,8 +13,8 @@ import (
)
const (
mockTraceId = "mock-trace-id"
mockSpanId = "mock-span-id"
mockTraceID = "mock-trace-id"
mockSpanID = "mock-span-id"
)
var mock tracespec.Trace = new(mockTrace)
@@ -24,8 +24,8 @@ func TestTraceLog(t *testing.T) {
atomic.StoreUint32(&initialized, 1)
ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock)
WithContext(ctx).(*traceLogger).write(&buf, levelInfo, testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceId))
assert.True(t, strings.Contains(buf.String(), mockSpanId))
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
}
func TestTraceError(t *testing.T) {
@@ -36,12 +36,12 @@ func TestTraceError(t *testing.T) {
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Error(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceId))
assert.True(t, strings.Contains(buf.String(), mockSpanId))
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
buf.Reset()
l.WithDuration(time.Second).Errorf(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceId))
assert.True(t, strings.Contains(buf.String(), mockSpanId))
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
}
func TestTraceInfo(t *testing.T) {
@@ -52,12 +52,12 @@ func TestTraceInfo(t *testing.T) {
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceId))
assert.True(t, strings.Contains(buf.String(), mockSpanId))
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceId))
assert.True(t, strings.Contains(buf.String(), mockSpanId))
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
}
func TestTraceSlow(t *testing.T) {
@@ -68,12 +68,12 @@ func TestTraceSlow(t *testing.T) {
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Slow(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceId))
assert.True(t, strings.Contains(buf.String(), mockSpanId))
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
buf.Reset()
l.WithDuration(time.Second).Slowf(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceId))
assert.True(t, strings.Contains(buf.String(), mockSpanId))
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
}
func TestTraceWithoutContext(t *testing.T) {
@@ -83,22 +83,22 @@ func TestTraceWithoutContext(t *testing.T) {
l := WithContext(context.Background()).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.False(t, strings.Contains(buf.String(), mockTraceId))
assert.False(t, strings.Contains(buf.String(), mockSpanId))
assert.False(t, strings.Contains(buf.String(), mockTraceID))
assert.False(t, strings.Contains(buf.String(), mockSpanID))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.False(t, strings.Contains(buf.String(), mockTraceId))
assert.False(t, strings.Contains(buf.String(), mockSpanId))
assert.False(t, strings.Contains(buf.String(), mockTraceID))
assert.False(t, strings.Contains(buf.String(), mockSpanID))
}
type mockTrace struct{}
func (t mockTrace) TraceId() string {
return mockTraceId
return mockTraceID
}
func (t mockTrace) SpanId() string {
return mockSpanId
return mockSpanID
}
func (t mockTrace) Finish() {
@@ -112,5 +112,5 @@ func (t mockTrace) Follow(ctx context.Context, serviceName, operationName string
return nil, nil
}
func (t mockTrace) Visit(fn func(key string, val string) bool) {
func (t mockTrace) Visit(fn func(key, val string) bool) {
}

View File

@@ -35,9 +35,9 @@ func (o *fieldOptionsWithContext) fromString() bool {
func (o *fieldOptionsWithContext) getDefault() (string, bool) {
if o == nil {
return "", false
} else {
return o.Default, len(o.Default) > 0
}
return o.Default, len(o.Default) > 0
}
func (o *fieldOptionsWithContext) optional() bool {
@@ -55,9 +55,9 @@ func (o *fieldOptionsWithContext) options() []string {
func (o *fieldOptions) optionalDep() string {
if o == nil {
return ""
} else {
return o.OptionalDep
}
return o.OptionalDep
}
func (o *fieldOptions) toOptionsWithContext(key string, m Valuer, fullName string) (
@@ -77,29 +77,29 @@ func (o *fieldOptions) toOptionsWithContext(key string, m Valuer, fullName strin
_, selfOn := m.Value(key)
if baseOn == selfOn {
return nil, fmt.Errorf("set value for either %q or %q in %q", dep, key, fullName)
} else {
optional = baseOn
}
optional = baseOn
} else {
_, baseOn := m.Value(dep)
_, selfOn := m.Value(key)
if baseOn != selfOn {
return nil, fmt.Errorf("values for %q and %q should be both provided or both not in %q",
dep, key, fullName)
} else {
optional = !baseOn
}
optional = !baseOn
}
}
if o.fieldOptionsWithContext.Optional == optional {
return &o.fieldOptionsWithContext, nil
} else {
return &fieldOptionsWithContext{
FromString: o.FromString,
Optional: optional,
Options: o.Options,
Default: o.Default,
}, nil
}
return &fieldOptionsWithContext{
FromString: o.FromString,
Optional: optional,
Options: o.Options,
Default: o.Default,
}, nil
}

Some files were not shown because too many files have changed in this diff Show More