fix golint issues in core/stat (#515)
* change to use ServiceGroup to make it more clear * fix golint issues in core/stat
This commit is contained in:
@@ -25,29 +25,29 @@ type (
|
|||||||
Stopper
|
Stopper
|
||||||
}
|
}
|
||||||
|
|
||||||
// A Group is a group of services.
|
// A ServiceGroup is a group of services.
|
||||||
Group struct {
|
ServiceGroup struct {
|
||||||
services []Service
|
services []Service
|
||||||
stopOnce func()
|
stopOnce func()
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewGroup returns a Group.
|
// NewServiceGroup returns a ServiceGroup.
|
||||||
func NewGroup() *Group {
|
func NewServiceGroup() *ServiceGroup {
|
||||||
sg := new(Group)
|
sg := new(ServiceGroup)
|
||||||
sg.stopOnce = syncx.Once(sg.doStop)
|
sg.stopOnce = syncx.Once(sg.doStop)
|
||||||
return sg
|
return sg
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add adds service into sg.
|
// Add adds service into sg.
|
||||||
func (sg *Group) Add(service Service) {
|
func (sg *ServiceGroup) Add(service Service) {
|
||||||
sg.services = append(sg.services, service)
|
sg.services = append(sg.services, service)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the Group.
|
// Start starts the ServiceGroup.
|
||||||
// There should not be any logic code after calling this method, because this method is a blocking one.
|
// There should not be any logic code after calling this method, because this method is a blocking one.
|
||||||
// Also, quitting this method will close the logx output.
|
// Also, quitting this method will close the logx output.
|
||||||
func (sg *Group) Start() {
|
func (sg *ServiceGroup) Start() {
|
||||||
proc.AddShutdownListener(func() {
|
proc.AddShutdownListener(func() {
|
||||||
log.Println("Shutting down...")
|
log.Println("Shutting down...")
|
||||||
sg.stopOnce()
|
sg.stopOnce()
|
||||||
@@ -56,12 +56,12 @@ func (sg *Group) Start() {
|
|||||||
sg.doStart()
|
sg.doStart()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the Group.
|
// Stop stops the ServiceGroup.
|
||||||
func (sg *Group) Stop() {
|
func (sg *ServiceGroup) Stop() {
|
||||||
sg.stopOnce()
|
sg.stopOnce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sg *Group) doStart() {
|
func (sg *ServiceGroup) doStart() {
|
||||||
routineGroup := threading.NewRoutineGroup()
|
routineGroup := threading.NewRoutineGroup()
|
||||||
|
|
||||||
for i := range sg.services {
|
for i := range sg.services {
|
||||||
@@ -74,7 +74,7 @@ func (sg *Group) doStart() {
|
|||||||
routineGroup.Wait()
|
routineGroup.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sg *Group) doStop() {
|
func (sg *ServiceGroup) doStop() {
|
||||||
for _, service := range sg.services {
|
for _, service := range sg.services {
|
||||||
service.Stop()
|
service.Stop()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ func TestServiceGroup(t *testing.T) {
|
|||||||
multipliers := []int{2, 3, 5, 7}
|
multipliers := []int{2, 3, 5, 7}
|
||||||
want := 1
|
want := 1
|
||||||
|
|
||||||
group := NewGroup()
|
group := NewServiceGroup()
|
||||||
for _, multiplier := range multipliers {
|
for _, multiplier := range multipliers {
|
||||||
want *= multiplier
|
want *= multiplier
|
||||||
service := newMockedService(multiplier)
|
service := newMockedService(multiplier)
|
||||||
@@ -68,7 +68,7 @@ func TestServiceGroup_WithStart(t *testing.T) {
|
|||||||
var wait sync.WaitGroup
|
var wait sync.WaitGroup
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
wait.Add(len(multipliers))
|
wait.Add(len(multipliers))
|
||||||
group := NewGroup()
|
group := NewServiceGroup()
|
||||||
for _, multiplier := range multipliers {
|
for _, multiplier := range multipliers {
|
||||||
var mul = multiplier
|
var mul = multiplier
|
||||||
group.Add(WithStart(func() {
|
group.Add(WithStart(func() {
|
||||||
@@ -95,7 +95,7 @@ func TestServiceGroup_WithStarter(t *testing.T) {
|
|||||||
var wait sync.WaitGroup
|
var wait sync.WaitGroup
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
wait.Add(len(multipliers))
|
wait.Add(len(multipliers))
|
||||||
group := NewGroup()
|
group := NewServiceGroup()
|
||||||
for _, multiplier := range multipliers {
|
for _, multiplier := range multipliers {
|
||||||
var mul = multiplier
|
var mul = multiplier
|
||||||
group.Add(WithStarter(mockedStarter{
|
group.Add(WithStarter(mockedStarter{
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report reports given message.
|
||||||
func Report(msg string) {
|
func Report(msg string) {
|
||||||
lock.RLock()
|
lock.RLock()
|
||||||
fn := reporter
|
fn := reporter
|
||||||
@@ -63,6 +64,7 @@ func Report(msg string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetReporter sets the given reporter.
|
||||||
func SetReporter(fn func(string)) {
|
func SetReporter(fn func(string)) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
|
|||||||
@@ -2,8 +2,10 @@
|
|||||||
|
|
||||||
package stat
|
package stat
|
||||||
|
|
||||||
|
// Report reports given message.
|
||||||
func Report(string) {
|
func Report(string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetReporter sets the given reporter.
|
||||||
func SetReporter(func(string)) {
|
func SetReporter(func(string)) {
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,20 +7,23 @@ import (
|
|||||||
|
|
||||||
"github.com/tal-tech/go-zero/core/executors"
|
"github.com/tal-tech/go-zero/core/executors"
|
||||||
"github.com/tal-tech/go-zero/core/logx"
|
"github.com/tal-tech/go-zero/core/logx"
|
||||||
|
"github.com/tal-tech/go-zero/core/syncx"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
LogInterval = time.Minute
|
logInterval = time.Minute
|
||||||
|
|
||||||
writerLock sync.Mutex
|
writerLock sync.Mutex
|
||||||
reportWriter Writer = nil
|
reportWriter Writer = nil
|
||||||
|
logEnabled = syncx.ForAtomicBool(true)
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
// Writer interface wraps the Write method.
|
||||||
Writer interface {
|
Writer interface {
|
||||||
Write(report *StatReport) error
|
Write(report *StatReport) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A StatReport is a stat report entry.
|
||||||
StatReport struct {
|
StatReport struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Timestamp int64 `json:"tm"`
|
Timestamp int64 `json:"tm"`
|
||||||
@@ -34,18 +37,26 @@ type (
|
|||||||
Top99p9th float32 `json:"t99p9"`
|
Top99p9th float32 `json:"t99p9"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A Metrics is used to log and report stat reports.
|
||||||
Metrics struct {
|
Metrics struct {
|
||||||
executor *executors.PeriodicalExecutor
|
executor *executors.PeriodicalExecutor
|
||||||
container *metricsContainer
|
container *metricsContainer
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DisableLog disables logs of stats.
|
||||||
|
func DisableLog() {
|
||||||
|
logEnabled.Set(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetReportWriter sets the report writer.
|
||||||
func SetReportWriter(writer Writer) {
|
func SetReportWriter(writer Writer) {
|
||||||
writerLock.Lock()
|
writerLock.Lock()
|
||||||
reportWriter = writer
|
reportWriter = writer
|
||||||
writerLock.Unlock()
|
writerLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewMetrics returns a Metrics.
|
||||||
func NewMetrics(name string) *Metrics {
|
func NewMetrics(name string) *Metrics {
|
||||||
container := &metricsContainer{
|
container := &metricsContainer{
|
||||||
name: name,
|
name: name,
|
||||||
@@ -53,21 +64,24 @@ func NewMetrics(name string) *Metrics {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &Metrics{
|
return &Metrics{
|
||||||
executor: executors.NewPeriodicalExecutor(LogInterval, container),
|
executor: executors.NewPeriodicalExecutor(logInterval, container),
|
||||||
container: container,
|
container: container,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add adds task to m.
|
||||||
func (m *Metrics) Add(task Task) {
|
func (m *Metrics) Add(task Task) {
|
||||||
m.executor.Add(task)
|
m.executor.Add(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddDrop adds a drop to m.
|
||||||
func (m *Metrics) AddDrop() {
|
func (m *Metrics) AddDrop() {
|
||||||
m.executor.Add(Task{
|
m.executor.Add(Task{
|
||||||
Drop: true,
|
Drop: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetName sets the name of m.
|
||||||
func (m *Metrics) SetName(name string) {
|
func (m *Metrics) SetName(name string) {
|
||||||
m.executor.Sync(func() {
|
m.executor.Sync(func() {
|
||||||
m.container.name = name
|
m.container.name = name
|
||||||
@@ -113,7 +127,7 @@ func (c *metricsContainer) Execute(v interface{}) {
|
|||||||
Name: c.name,
|
Name: c.name,
|
||||||
Timestamp: time.Now().Unix(),
|
Timestamp: time.Now().Unix(),
|
||||||
Pid: c.pid,
|
Pid: c.pid,
|
||||||
ReqsPerSecond: float32(size) / float32(LogInterval/time.Second),
|
ReqsPerSecond: float32(size) / float32(logInterval/time.Second),
|
||||||
Drops: drops,
|
Drops: drops,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,10 +206,12 @@ func getTopDuration(tasks []Task) float32 {
|
|||||||
|
|
||||||
func log(report *StatReport) {
|
func log(report *StatReport) {
|
||||||
writeReport(report)
|
writeReport(report)
|
||||||
logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+
|
if logEnabled.True() {
|
||||||
"90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms",
|
logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+
|
||||||
report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median,
|
"90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms",
|
||||||
report.Top90th, report.Top99th, report.Top99p9th)
|
report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median,
|
||||||
|
report.Top90th, report.Top99th, report.Top99p9th)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeReport(report *StatReport) {
|
func writeReport(report *StatReport) {
|
||||||
|
|||||||
@@ -6,9 +6,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/tal-tech/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMetrics(t *testing.T) {
|
func TestMetrics(t *testing.T) {
|
||||||
|
logx.Disable()
|
||||||
|
DisableLog()
|
||||||
|
defer logEnabled.Set(true)
|
||||||
|
|
||||||
counts := []int{1, 5, 10, 100, 1000, 1000}
|
counts := []int{1, 5, 10, 100, 1000, 1000}
|
||||||
for _, count := range counts {
|
for _, count := range counts {
|
||||||
m := NewMetrics("foo")
|
m := NewMetrics("foo")
|
||||||
|
|||||||
@@ -12,12 +12,15 @@ import (
|
|||||||
|
|
||||||
const httpTimeout = time.Second * 5
|
const httpTimeout = time.Second * 5
|
||||||
|
|
||||||
|
// ErrWriteFailed is an error that indicates failed to submit a StatReport.
|
||||||
var ErrWriteFailed = errors.New("submit failed")
|
var ErrWriteFailed = errors.New("submit failed")
|
||||||
|
|
||||||
|
// A RemoteWriter is a writer to write StatReport.
|
||||||
type RemoteWriter struct {
|
type RemoteWriter struct {
|
||||||
endpoint string
|
endpoint string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewRemoteWriter returns a RemoteWriter.
|
||||||
func NewRemoteWriter(endpoint string) Writer {
|
func NewRemoteWriter(endpoint string) Writer {
|
||||||
return &RemoteWriter{
|
return &RemoteWriter{
|
||||||
endpoint: endpoint,
|
endpoint: endpoint,
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package stat
|
|||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
|
// A Task is a task that is reported to Metrics.
|
||||||
type Task struct {
|
type Task struct {
|
||||||
Drop bool
|
Drop bool
|
||||||
Duration time.Duration
|
Duration time.Duration
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ func init() {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CpuUsage returns current cpu usage.
|
||||||
func CpuUsage() int64 {
|
func CpuUsage() int64 {
|
||||||
return atomic.LoadInt64(&cpuUsage)
|
return atomic.LoadInt64(&cpuUsage)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user