Compare commits

..

3 Commits

Author SHA1 Message Date
kevin
8745039877 move lang.Must into logx.Must to make sure output fatal message as json 2020-08-14 15:08:06 +08:00
kevin
9d9399ad10 confirm addition after add called in periodical executor 2020-08-14 11:50:01 +08:00
kevin
e7dd04701c add more tests 2020-08-14 11:24:56 +08:00
23 changed files with 126 additions and 79 deletions

View File

@@ -2,7 +2,7 @@ package discov
import (
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
type (
@@ -26,7 +26,7 @@ func NewFacade(endpoints []string) Facade {
func (f Facade) Client() internal.EtcdClient {
conn, err := f.registry.GetConn(f.endpoints)
lang.Must(err)
logx.Must(err)
return conn
}

View File

@@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
@@ -32,19 +33,21 @@ type (
container TaskContainer
waitGroup sync.WaitGroup
// avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
wgBarrier syncx.Barrier
guarded bool
newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
wgBarrier syncx.Barrier
confirmChan chan lang.PlaceholderType
guarded bool
newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
}
)
func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
// buffer 1 to let the caller go quickly
commander: make(chan interface{}, 1),
interval: interval,
container: container,
commander: make(chan interface{}, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
},
@@ -59,10 +62,12 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per
func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
}
}
func (pe *PeriodicalExecutor) Flush() bool {
pe.enterExecution()
return pe.executeTasks(func() interface{} {
pe.lock.Lock()
defer pe.lock.Unlock()
@@ -114,6 +119,8 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
select {
case vals := <-pe.commander:
commanded = true
pe.enterExecution()
pe.confirmChan <- lang.Placeholder
pe.executeTasks(vals)
last = timex.Now()
case <-ticker.Chan():
@@ -135,11 +142,18 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
})
}
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
func (pe *PeriodicalExecutor) doneExecution() {
pe.waitGroup.Done()
}
func (pe *PeriodicalExecutor) enterExecution() {
pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1)
})
defer pe.waitGroup.Done()
}
func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool {
defer pe.doneExecution()
ok := pe.hasTasks(tasks)
if ok {

View File

@@ -106,6 +106,40 @@ func TestPeriodicalExecutor_Bulk(t *testing.T) {
lock.Unlock()
}
func TestPeriodicalExecutor_Wait(t *testing.T) {
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []interface{}) {
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(time.Second))
for i := 0; i < 10; i++ {
executer.Add(1)
}
executer.Flush()
executer.Wait()
}
func TestPeriodicalExecutor_WaitFast(t *testing.T) {
const total = 3
var cnt int
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []interface{}) {
defer func() {
cnt++
}()
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
for i := 0; i < total; i++ {
executer.Add(2)
}
executer.Flush()
executer.Wait()
assert.Equal(t, total, cnt)
}
// go test -benchtime 10s -bench .
func BenchmarkExecutor(b *testing.B) {
b.ReportAllocs()

View File

@@ -1,16 +1,8 @@
package lang
import "log"
var Placeholder PlaceholderType
type (
GenericType = interface{}
PlaceholderType = struct{}
)
func Must(err error) {
if err != nil {
log.Fatal(err)
}
}

View File

@@ -1,7 +0,0 @@
package lang
import "testing"
func TestMust(t *testing.T) {
Must(nil)
}

View File

@@ -17,7 +17,6 @@ import (
"sync/atomic"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/sysx"
"github.com/tal-tech/go-zero/core/timex"
)
@@ -46,6 +45,7 @@ const (
levelInfo = "info"
levelError = "error"
levelSevere = "severe"
levelFatal = "fatal"
levelSlow = "slow"
levelStat = "stat"
@@ -100,7 +100,7 @@ type (
)
func MustSetup(c LogConf) {
lang.Must(SetUp(c))
Must(SetUp(c))
}
// SetUp sets up the logx. If already set up, just return nil.
@@ -210,6 +210,14 @@ func Infof(format string, v ...interface{}) {
infoSync(fmt.Sprintf(format, v...))
}
func Must(err error) {
if err != nil {
msg := formatWithCaller(err.Error(), 3)
output(severeLog, levelFatal, msg)
os.Exit(1)
}
}
func SetLevel(level uint32) {
atomic.StoreUint32(&logLevel, level)
}

View File

@@ -131,6 +131,10 @@ func TestSetLevelWithDuration(t *testing.T) {
assert.Equal(t, 0, writer.builder.Len())
}
func TestMustNil(t *testing.T) {
Must(nil)
}
func BenchmarkCopyByteSliceAppend(b *testing.B) {
for i := 0; i < b.N; i++ {
var buf []byte

View File

@@ -7,7 +7,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/iox"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
const (
@@ -24,17 +24,17 @@ var (
func init() {
cpus, err := perCpuUsage()
lang.Must(err)
logx.Must(err)
cores = uint64(len(cpus))
sets, err := cpuSets()
lang.Must(err)
logx.Must(err)
quota = float64(len(sets))
cq, err := cpuQuota()
if err == nil {
if cq != -1 {
period, err := cpuPeriod()
lang.Must(err)
logx.Must(err)
limit := float64(cq) / float64(period)
if limit < quota {
@@ -44,10 +44,10 @@ func init() {
}
preSystem, err = systemCpuUsage()
lang.Must(err)
logx.Must(err)
preTotal, err = totalCpuUsage()
lang.Must(err)
logx.Must(err)
}
func RefreshCpu() uint64 {

View File

@@ -5,7 +5,6 @@ import (
"time"
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/stat"
@@ -33,7 +32,7 @@ type delayTask struct {
func init() {
var err error
timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean)
lang.Must(err)
logx.Must(err)
proc.AddShutdownListener(func() {
timingWheel.Drain(clean)

View File

@@ -3,7 +3,7 @@ package sysx
import (
"os"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/stringx"
)
var hostname string
@@ -11,7 +11,9 @@ var hostname string
func init() {
var err error
hostname, err = os.Hostname()
lang.Must(err)
if err != nil {
hostname = stringx.RandId()
}
}
func Hostname() string {

View File

@@ -10,6 +10,7 @@ import (
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -99,7 +100,7 @@ func main() {
gb := breaker.NewBreaker()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,state,googleCalls,netflixCalls")

View File

@@ -5,12 +5,12 @@ import (
"time"
"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
func main() {
sub, err := discov.NewSubscriber([]string{"etcd.discovery:2379"}, "028F2C35852D", discov.Exclusive())
lang.Must(err)
logx.Must(err)
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/threading"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -119,14 +120,14 @@ func main() {
flag.Parse()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,goodOk,goodFail,goodReject,goodErrs,goodUnknowns,goodDropRatio,"+
"heavyOk,heavyFail,heavyReject,heavyErrs,heavyUnknowns,heavyDropRatio")
var gm, hm metric
dur, err := time.ParseDuration(*duration)
lang.Must(err)
logx.Must(err)
done := make(chan lang.PlaceholderType)
group := threading.NewRoutineGroup()
group.RunSafe(func() {

View File

@@ -13,7 +13,7 @@ import (
"github.com/tal-tech/go-zero/core/collection"
"github.com/tal-tech/go-zero/core/executors"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
"gopkg.in/cheggaaa/pb.v1"
)
@@ -47,7 +47,7 @@ func main() {
lessWriter = executors.NewLessExecutor(interval * total / 100)
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "second,maxFlight,flying,agressiveAvgFlying,lazyAvgFlying,bothAvgFlying")

View File

@@ -11,7 +11,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/fx"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
)
var (
@@ -27,7 +27,7 @@ func main() {
flag.Parse()
fp, err := os.Create("result.csv")
lang.Must(err)
logx.Must(err)
defer fp.Close()
fmt.Fprintln(fp, "seconds,total,pass,fail,drop")

View File

@@ -4,7 +4,7 @@ import (
"errors"
"strings"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/urfave/cli"
)
@@ -32,8 +32,8 @@ func DartCommand(c *cli.Context) error {
dir = dir + "/"
}
api.Info.Title = strings.Replace(apiFile, ".api", "", -1)
lang.Must(genData(dir+"data/", api))
lang.Must(genApi(dir+"api/", api))
lang.Must(genVars(dir + "vars/"))
logx.Must(genData(dir+"data/", api))
logx.Must(genApi(dir+"api/", api))
logx.Must(genVars(dir + "vars/"))
return nil
}

View File

@@ -14,7 +14,7 @@ import (
"time"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
apiformat "github.com/tal-tech/go-zero/tools/goctl/api/format"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
apiutil "github.com/tal-tech/go-zero/tools/goctl/api/util"
@@ -45,15 +45,15 @@ func GoCommand(c *cli.Context) error {
return err
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genEtc(dir, api))
lang.Must(genConfig(dir))
lang.Must(genMain(dir, api))
lang.Must(genServiceContext(dir, api))
lang.Must(genTypes(dir, api))
lang.Must(genHandlers(dir, api))
lang.Must(genRoutes(dir, api))
lang.Must(genLogic(dir, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genEtc(dir, api))
logx.Must(genConfig(dir))
logx.Must(genMain(dir, api))
logx.Must(genServiceContext(dir, api))
logx.Must(genTypes(dir, api))
logx.Must(genHandlers(dir, api))
logx.Must(genRoutes(dir, api))
logx.Must(genLogic(dir, api))
// it does not work
format(dir)
createGoModFileIfNeed(dir)

View File

@@ -6,7 +6,7 @@ import (
"strings"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/tal-tech/go-zero/tools/goctl/util"
"github.com/urfave/cli"
@@ -36,9 +36,9 @@ func JavaCommand(c *cli.Context) error {
packetName = packetName[:len(packetName)-4]
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genPacket(dir, packetName, api))
lang.Must(genComponents(dir, packetName, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genPacket(dir, packetName, api))
logx.Must(genComponents(dir, packetName, api))
fmt.Println(aurora.Green("Done."))
return nil

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"os"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
)
@@ -14,8 +14,8 @@ func main() {
}
p, err := parser.NewParser(os.Args[1])
lang.Must(err)
logx.Must(err)
api, err := p.Parse()
lang.Must(err)
logx.Must(err)
fmt.Println(api)
}

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"github.com/logrusorgru/aurora"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/parser"
"github.com/tal-tech/go-zero/tools/goctl/util"
"github.com/urfave/cli"
@@ -34,9 +34,9 @@ func TsCommand(c *cli.Context) error {
return err
}
lang.Must(util.MkdirIfNotExist(dir))
lang.Must(genHandler(dir, webApi, caller, api, unwrapApi))
lang.Must(genComponents(dir, api))
logx.Must(util.MkdirIfNotExist(dir))
logx.Must(genHandler(dir, webApi, caller, api, unwrapApi))
logx.Must(genComponents(dir, api))
fmt.Println(aurora.Green("Done."))
return nil

View File

@@ -8,13 +8,13 @@ import (
"path"
"strings"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/api/spec"
"github.com/tal-tech/go-zero/tools/goctl/util"
)
func MaybeCreateFile(dir, subdir, file string) (fp *os.File, created bool, err error) {
lang.Must(util.MkdirIfNotExist(path.Join(dir, subdir)))
logx.Must(util.MkdirIfNotExist(path.Join(dir, subdir)))
fpath := path.Join(dir, subdir, file)
if util.FileExists(fpath) {
fmt.Printf("%s exists, ignored generation\n", fpath)

View File

@@ -8,7 +8,6 @@ import (
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/core/hash"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/tools/goctl/update/config"
"github.com/tal-tech/go-zero/tools/goctl/util"
@@ -56,5 +55,5 @@ func main() {
fs := http.FileServer(http.Dir(c.FileDir))
http.Handle(c.FilePath, http.StripPrefix(c.FilePath, forChksumHandler(path.Join(c.FileDir, filename), fs)))
lang.Must(http.ListenAndServe(c.ListenOn, nil))
logx.Must(http.ListenAndServe(c.ListenOn, nil))
}

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"strings"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/urfave/cli"
)
@@ -14,7 +14,7 @@ func FileModelCommand(c *cli.Context) error {
if len(configFile) == 0 {
return errors.New("missing config value")
}
lang.Must(genModelWithConfigFile(configFile))
logx.Must(genModelWithConfigFile(configFile))
return nil
}
@@ -36,6 +36,6 @@ func CmdModelCommand(c *cli.Context) error {
user := addressArr[0]
host := addressArr[1]
address = fmt.Sprintf("%v@tcp(%v)/information_schema", user, host)
lang.Must(genModelWithDataSource(address, schema, force, redis, nil))
logx.Must(genModelWithDataSource(address, schema, force, redis, nil))
return nil
}