refactor
This commit is contained in:
@@ -1,83 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func WithStreamClientInterceptors(interceptors ...grpc.StreamClientInterceptor) grpc.DialOption {
|
||||
return grpc.WithStreamInterceptor(chainStreamClientInterceptors(interceptors...))
|
||||
}
|
||||
|
||||
func WithUnaryClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) grpc.DialOption {
|
||||
return grpc.WithUnaryInterceptor(chainUnaryClientInterceptors(interceptors...))
|
||||
}
|
||||
|
||||
func chainStreamClientInterceptors(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor {
|
||||
switch len(interceptors) {
|
||||
case 0:
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
|
||||
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
case 1:
|
||||
return interceptors[0]
|
||||
default:
|
||||
last := len(interceptors) - 1
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
|
||||
method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
var chainStreamer grpc.Streamer
|
||||
var current int
|
||||
|
||||
chainStreamer = func(curCtx context.Context, curDesc *grpc.StreamDesc, curCc *grpc.ClientConn,
|
||||
curMethod string, curOpts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
if current == last {
|
||||
return streamer(curCtx, curDesc, curCc, curMethod, curOpts...)
|
||||
}
|
||||
|
||||
current++
|
||||
clientStream, err := interceptors[current](curCtx, curDesc, curCc, curMethod, chainStreamer, curOpts...)
|
||||
current--
|
||||
|
||||
return clientStream, err
|
||||
}
|
||||
|
||||
return interceptors[0](ctx, desc, cc, method, chainStreamer, opts...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func chainUnaryClientInterceptors(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
|
||||
switch len(interceptors) {
|
||||
case 0:
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
case 1:
|
||||
return interceptors[0]
|
||||
default:
|
||||
last := len(interceptors) - 1
|
||||
return func(ctx context.Context, method string, req, reply interface{},
|
||||
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
var chainInvoker grpc.UnaryInvoker
|
||||
var current int
|
||||
|
||||
chainInvoker = func(curCtx context.Context, curMethod string, curReq, curReply interface{},
|
||||
curCc *grpc.ClientConn, curOpts ...grpc.CallOption) error {
|
||||
if current == last {
|
||||
return invoker(curCtx, curMethod, curReq, curReply, curCc, curOpts...)
|
||||
}
|
||||
|
||||
current++
|
||||
err := interceptors[current](curCtx, curMethod, curReq, curReply, curCc, chainInvoker, curOpts...)
|
||||
current--
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return interceptors[0](ctx, method, req, reply, cc, chainInvoker, opts...)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,81 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func WithStreamServerInterceptors(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption {
|
||||
return grpc.StreamInterceptor(chainStreamServerInterceptors(interceptors...))
|
||||
}
|
||||
|
||||
func WithUnaryServerInterceptors(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption {
|
||||
return grpc.UnaryInterceptor(chainUnaryServerInterceptors(interceptors...))
|
||||
}
|
||||
|
||||
func chainStreamServerInterceptors(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
|
||||
switch len(interceptors) {
|
||||
case 0:
|
||||
return func(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
return handler(srv, stream)
|
||||
}
|
||||
case 1:
|
||||
return interceptors[0]
|
||||
default:
|
||||
last := len(interceptors) - 1
|
||||
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
var chainHandler grpc.StreamHandler
|
||||
var current int
|
||||
|
||||
chainHandler = func(curSrv interface{}, curStream grpc.ServerStream) error {
|
||||
if current == last {
|
||||
return handler(curSrv, curStream)
|
||||
}
|
||||
|
||||
current++
|
||||
err := interceptors[current](curSrv, curStream, info, chainHandler)
|
||||
current--
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return interceptors[0](srv, stream, info, chainHandler)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func chainUnaryServerInterceptors(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
|
||||
switch len(interceptors) {
|
||||
case 0:
|
||||
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
|
||||
interface{}, error) {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
case 1:
|
||||
return interceptors[0]
|
||||
default:
|
||||
last := len(interceptors) - 1
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
|
||||
interface{}, error) {
|
||||
var chainHandler grpc.UnaryHandler
|
||||
var current int
|
||||
|
||||
chainHandler = func(curCtx context.Context, curReq interface{}) (interface{}, error) {
|
||||
if current == last {
|
||||
return handler(curCtx, curReq)
|
||||
}
|
||||
|
||||
current++
|
||||
resp, err := interceptors[current](curCtx, curReq, info, chainHandler)
|
||||
current--
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return interceptors[0](ctx, req, info, chainHandler)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,71 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"zero/core/rpc/clientinterceptors"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const dialTimeout = time.Second * 3
|
||||
|
||||
type (
|
||||
ClientOptions struct {
|
||||
Timeout time.Duration
|
||||
DialOptions []grpc.DialOption
|
||||
}
|
||||
|
||||
ClientOption func(options *ClientOptions)
|
||||
|
||||
Client interface {
|
||||
Next() (*grpc.ClientConn, bool)
|
||||
}
|
||||
)
|
||||
|
||||
func WithDialOption(opt grpc.DialOption) ClientOption {
|
||||
return func(options *ClientOptions) {
|
||||
options.DialOptions = append(options.DialOptions, opt)
|
||||
}
|
||||
}
|
||||
|
||||
func WithTimeout(timeout time.Duration) ClientOption {
|
||||
return func(options *ClientOptions) {
|
||||
options.Timeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
func buildDialOptions(opts ...ClientOption) []grpc.DialOption {
|
||||
var clientOptions ClientOptions
|
||||
for _, opt := range opts {
|
||||
opt(&clientOptions)
|
||||
}
|
||||
|
||||
options := []grpc.DialOption{
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock(),
|
||||
WithUnaryClientInterceptors(
|
||||
clientinterceptors.BreakerInterceptor,
|
||||
clientinterceptors.DurationInterceptor,
|
||||
clientinterceptors.PromMetricInterceptor,
|
||||
clientinterceptors.TimeoutInterceptor(clientOptions.Timeout),
|
||||
clientinterceptors.TracingInterceptor,
|
||||
),
|
||||
}
|
||||
|
||||
return append(options, clientOptions.DialOptions...)
|
||||
}
|
||||
|
||||
func dial(server string, opts ...ClientOption) (*grpc.ClientConn, error) {
|
||||
options := buildDialOptions(opts...)
|
||||
timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(timeCtx, server, options...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rpc dial: %s, error: %s", server, err.Error())
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
|
||||
"zero/core/breaker"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func acceptable(err error) bool {
|
||||
switch status.Code(err) {
|
||||
case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
|
||||
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
breakerName := path.Join(cc.Target(), method)
|
||||
return breaker.DoWithAcceptable(breakerName, func() error {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}, acceptable)
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"zero/core/breaker"
|
||||
"zero/core/stat"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func init() {
|
||||
stat.SetReporter(nil)
|
||||
}
|
||||
|
||||
type mockError struct {
|
||||
st *status.Status
|
||||
}
|
||||
|
||||
func (m mockError) GRPCStatus() *status.Status {
|
||||
return m.st
|
||||
}
|
||||
|
||||
func (m mockError) Error() string {
|
||||
return "mocked error"
|
||||
}
|
||||
|
||||
func TestBreakerInterceptorNotFound(t *testing.T) {
|
||||
err := mockError{st: status.New(codes.NotFound, "any")}
|
||||
for i := 0; i < 1000; i++ {
|
||||
assert.Equal(t, err, breaker.DoWithAcceptable("call", func() error {
|
||||
return err
|
||||
}, acceptable))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBreakerInterceptorDeadlineExceeded(t *testing.T) {
|
||||
err := mockError{st: status.New(codes.DeadlineExceeded, "any")}
|
||||
errs := make(map[error]int)
|
||||
for i := 0; i < 1000; i++ {
|
||||
e := breaker.DoWithAcceptable("call", func() error {
|
||||
return err
|
||||
}, acceptable)
|
||||
errs[e]++
|
||||
}
|
||||
assert.Equal(t, 2, len(errs))
|
||||
assert.True(t, errs[err] > 0)
|
||||
assert.True(t, errs[breaker.ErrServiceUnavailable] > 0)
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"zero/core/logx"
|
||||
"zero/core/timex"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const slowThreshold = time.Millisecond * 500
|
||||
|
||||
func DurationInterceptor(ctx context.Context, method string, req, reply interface{},
|
||||
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
serverName := path.Join(cc.Target(), method)
|
||||
start := timex.Now()
|
||||
err := invoker(ctx, method, req, reply, cc, opts...)
|
||||
if err != nil {
|
||||
logx.WithDuration(timex.Since(start)).Infof("fail - %s - %v - %s", serverName, req, err.Error())
|
||||
} else {
|
||||
elapsed := timex.Since(start)
|
||||
if elapsed > slowThreshold {
|
||||
logx.WithDuration(elapsed).Slowf("[RPC] ok - slowcall - %s - %v - %v", serverName, req, reply)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"zero/core/metric"
|
||||
"zero/core/timex"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const clientNamespace = "rpc_client"
|
||||
|
||||
var (
|
||||
metricClientReqDur = metric.NewHistogramVec(&metric.HistogramVecOpts{
|
||||
Namespace: clientNamespace,
|
||||
Subsystem: "requests",
|
||||
Name: "duration_ms",
|
||||
Help: "rpc client requests duration(ms).",
|
||||
Labels: []string{"method"},
|
||||
Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000},
|
||||
})
|
||||
|
||||
metricClientReqCodeTotal = metric.NewCounterVec(&metric.CounterVecOpts{
|
||||
Namespace: clientNamespace,
|
||||
Subsystem: "requests",
|
||||
Name: "code_total",
|
||||
Help: "rpc client requests code count.",
|
||||
Labels: []string{"method", "code"},
|
||||
})
|
||||
)
|
||||
|
||||
func PromMetricInterceptor(ctx context.Context, method string, req, reply interface{},
|
||||
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
startTime := timex.Now()
|
||||
err := invoker(ctx, method, req, reply, cc, opts...)
|
||||
metricClientReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), method)
|
||||
metricClientReqCodeTotal.Inc(method, strconv.Itoa(int(status.Code(err))))
|
||||
return err
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"zero/core/contextx"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const defaultTimeout = time.Second * 2
|
||||
|
||||
func TimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
|
||||
if timeout <= 0 {
|
||||
timeout = defaultTimeout
|
||||
}
|
||||
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
|
||||
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
ctx, cancel := contextx.ShrinkDeadline(ctx, timeout)
|
||||
defer cancel()
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
package clientinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"zero/core/trace"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TracingInterceptor(ctx context.Context, method string, req, reply interface{},
|
||||
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
ctx, span := trace.StartClientSpan(ctx, cc.Target(), method)
|
||||
defer span.Finish()
|
||||
|
||||
var pairs []string
|
||||
span.Visit(func(key, val string) bool {
|
||||
pairs = append(pairs, key, val)
|
||||
return true
|
||||
})
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, pairs...)
|
||||
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
)
|
||||
|
||||
type DirectClient struct {
|
||||
conn *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewDirectClient(server string, opts ...ClientOption) (*DirectClient, error) {
|
||||
opts = append(opts, WithDialOption(grpc.WithBalancerName(roundrobin.Name)))
|
||||
conn, err := dial(server, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &DirectClient{
|
||||
conn: conn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *DirectClient) Next() (*grpc.ClientConn, bool) {
|
||||
state := c.conn.GetState()
|
||||
if state == connectivity.Ready {
|
||||
return c.conn, true
|
||||
} else {
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"zero/core/logx"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
// because grpclog.errorLog is not exported, we need to define our own.
|
||||
const errorLevel = 2
|
||||
|
||||
var once sync.Once
|
||||
|
||||
type Logger struct{}
|
||||
|
||||
func InitLogger() {
|
||||
once.Do(func() {
|
||||
grpclog.SetLoggerV2(new(Logger))
|
||||
})
|
||||
}
|
||||
|
||||
func (l *Logger) Error(args ...interface{}) {
|
||||
logx.Error(args...)
|
||||
}
|
||||
|
||||
func (l *Logger) Errorf(format string, args ...interface{}) {
|
||||
logx.Errorf(format, args...)
|
||||
}
|
||||
|
||||
func (l *Logger) Errorln(args ...interface{}) {
|
||||
logx.Error(args...)
|
||||
}
|
||||
|
||||
func (l *Logger) Fatal(args ...interface{}) {
|
||||
logx.Error(args...)
|
||||
}
|
||||
|
||||
func (l *Logger) Fatalf(format string, args ...interface{}) {
|
||||
logx.Errorf(format, args...)
|
||||
}
|
||||
|
||||
func (l *Logger) Fatalln(args ...interface{}) {
|
||||
logx.Error(args...)
|
||||
}
|
||||
|
||||
func (l *Logger) Info(args ...interface{}) {
|
||||
// ignore builtin grpc info
|
||||
}
|
||||
|
||||
func (l *Logger) Infoln(args ...interface{}) {
|
||||
// ignore builtin grpc info
|
||||
}
|
||||
|
||||
func (l *Logger) Infof(format string, args ...interface{}) {
|
||||
// ignore builtin grpc info
|
||||
}
|
||||
|
||||
func (l *Logger) V(v int) bool {
|
||||
return v >= errorLevel
|
||||
}
|
||||
|
||||
func (l *Logger) Warning(args ...interface{}) {
|
||||
// ignore builtin grpc warning
|
||||
}
|
||||
|
||||
func (l *Logger) Warningln(args ...interface{}) {
|
||||
// ignore builtin grpc warning
|
||||
}
|
||||
|
||||
func (l *Logger) Warningf(format string, args ...interface{}) {
|
||||
// ignore builtin grpc warning
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import "zero/core/discov"
|
||||
|
||||
func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string, opts ...ServerOption) (Server, error) {
|
||||
registerEtcd := func() error {
|
||||
pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, listenOn)
|
||||
return pubClient.KeepAlive()
|
||||
}
|
||||
server := keepAliveServer{
|
||||
registerEtcd: registerEtcd,
|
||||
Server: NewRpcServer(listenOn, opts...),
|
||||
}
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
||||
type keepAliveServer struct {
|
||||
registerEtcd func() error
|
||||
Server
|
||||
}
|
||||
|
||||
func (ags keepAliveServer) Start(fn RegisterFn) error {
|
||||
if err := ags.registerEtcd(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ags.Server.Start(fn)
|
||||
}
|
||||
@@ -1,82 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"zero/core/proc"
|
||||
"zero/core/rpc/serverinterceptors"
|
||||
"zero/core/stat"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type (
|
||||
ServerOption func(options *rpcServerOptions)
|
||||
|
||||
rpcServerOptions struct {
|
||||
metrics *stat.Metrics
|
||||
}
|
||||
|
||||
rpcServer struct {
|
||||
*baseRpcServer
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
InitLogger()
|
||||
}
|
||||
|
||||
func NewRpcServer(address string, opts ...ServerOption) Server {
|
||||
var options rpcServerOptions
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
if options.metrics == nil {
|
||||
options.metrics = stat.NewMetrics(address)
|
||||
}
|
||||
|
||||
return &rpcServer{
|
||||
baseRpcServer: newBaseRpcServer(address, options.metrics),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *rpcServer) SetName(name string) {
|
||||
s.baseRpcServer.SetName(name)
|
||||
}
|
||||
|
||||
func (s *rpcServer) Start(register RegisterFn) error {
|
||||
lis, err := net.Listen("tcp", s.address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
unaryInterceptors := []grpc.UnaryServerInterceptor{
|
||||
serverinterceptors.UnaryCrashInterceptor(),
|
||||
serverinterceptors.UnaryStatInterceptor(s.metrics),
|
||||
serverinterceptors.UnaryPromMetricInterceptor(),
|
||||
}
|
||||
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
|
||||
streamInterceptors := []grpc.StreamServerInterceptor{
|
||||
serverinterceptors.StreamCrashInterceptor,
|
||||
}
|
||||
streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
|
||||
options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...),
|
||||
WithStreamServerInterceptors(streamInterceptors...))
|
||||
server := grpc.NewServer(options...)
|
||||
register(server)
|
||||
// we need to make sure all others are wrapped up
|
||||
// so we do graceful stop at shutdown phase instead of wrap up phase
|
||||
shutdownCalled := proc.AddShutdownListener(func() {
|
||||
server.GracefulStop()
|
||||
})
|
||||
err = server.Serve(lis)
|
||||
shutdownCalled()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func WithMetrics(metrics *stat.Metrics) ServerOption {
|
||||
return func(options *rpcServerOptions) {
|
||||
options.metrics = metrics
|
||||
}
|
||||
}
|
||||
@@ -1,102 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"zero/core/discov"
|
||||
"zero/core/logx"
|
||||
"zero/core/threading"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
)
|
||||
|
||||
const (
|
||||
coolOffTime = time.Second * 5
|
||||
retryTimes = 3
|
||||
)
|
||||
|
||||
type (
|
||||
RoundRobinSubClient struct {
|
||||
*discov.RoundRobinSubClient
|
||||
}
|
||||
|
||||
ConsistentSubClient struct {
|
||||
*discov.ConsistentSubClient
|
||||
}
|
||||
)
|
||||
|
||||
func NewRoundRobinRpcClient(endpoints []string, key string, opts ...ClientOption) (*RoundRobinSubClient, error) {
|
||||
subClient, err := discov.NewRoundRobinSubClient(endpoints, key, func(server string) (interface{}, error) {
|
||||
return dial(server, opts...)
|
||||
}, func(server string, conn interface{}) error {
|
||||
return closeConn(conn.(*grpc.ClientConn))
|
||||
}, discov.Exclusive())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return &RoundRobinSubClient{subClient}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func NewConsistentRpcClient(endpoints []string, key string, opts ...ClientOption) (*ConsistentSubClient, error) {
|
||||
subClient, err := discov.NewConsistentSubClient(endpoints, key, func(server string) (interface{}, error) {
|
||||
return dial(server, opts...)
|
||||
}, func(server string, conn interface{}) error {
|
||||
return closeConn(conn.(*grpc.ClientConn))
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return &ConsistentSubClient{subClient}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (cli *RoundRobinSubClient) Next() (*grpc.ClientConn, bool) {
|
||||
return next(func() (interface{}, bool) {
|
||||
return cli.RoundRobinSubClient.Next()
|
||||
})
|
||||
}
|
||||
|
||||
func (cli *ConsistentSubClient) Next(key string) (*grpc.ClientConn, bool) {
|
||||
return next(func() (interface{}, bool) {
|
||||
return cli.ConsistentSubClient.Next(key)
|
||||
})
|
||||
}
|
||||
|
||||
func closeConn(conn *grpc.ClientConn) error {
|
||||
// why to close the conn asynchronously is because maybe another goroutine
|
||||
// is using the same conn, we can wait the coolOffTime to let the other
|
||||
// goroutine to finish using the conn.
|
||||
// after the conn unregistered, the balancer will not assign the conn,
|
||||
// but maybe the already assigned tasks are still using it.
|
||||
threading.GoSafe(func() {
|
||||
time.Sleep(coolOffTime)
|
||||
if err := conn.Close(); err != nil {
|
||||
logx.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func next(nextFn func() (interface{}, bool)) (*grpc.ClientConn, bool) {
|
||||
for i := 0; i < retryTimes; i++ {
|
||||
v, ok := nextFn()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
conn, yes := v.(*grpc.ClientConn)
|
||||
if !yes {
|
||||
break
|
||||
}
|
||||
|
||||
switch conn.GetState() {
|
||||
case connectivity.Ready:
|
||||
return conn, true
|
||||
}
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type RRClient struct {
|
||||
conns []*grpc.ClientConn
|
||||
index int
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func NewRRClient(endpoints []string) (*RRClient, error) {
|
||||
var conns []*grpc.ClientConn
|
||||
for _, endpoint := range endpoints {
|
||||
conn, err := dial(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conns = append(conns, conn)
|
||||
}
|
||||
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
return &RRClient{
|
||||
conns: conns,
|
||||
index: rand.Intn(len(conns)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *RRClient) Next() *grpc.ClientConn {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.index = (c.index + 1) % len(c.conns)
|
||||
return c.conns[c.index]
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"zero/core/stat"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type (
|
||||
RegisterFn func(*grpc.Server)
|
||||
|
||||
Server interface {
|
||||
AddOptions(options ...grpc.ServerOption)
|
||||
AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor)
|
||||
AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor)
|
||||
SetName(string)
|
||||
Start(register RegisterFn) error
|
||||
}
|
||||
|
||||
baseRpcServer struct {
|
||||
address string
|
||||
metrics *stat.Metrics
|
||||
options []grpc.ServerOption
|
||||
streamInterceptors []grpc.StreamServerInterceptor
|
||||
unaryInterceptors []grpc.UnaryServerInterceptor
|
||||
}
|
||||
)
|
||||
|
||||
func newBaseRpcServer(address string, metrics *stat.Metrics) *baseRpcServer {
|
||||
return &baseRpcServer{
|
||||
address: address,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *baseRpcServer) AddOptions(options ...grpc.ServerOption) {
|
||||
s.options = append(s.options, options...)
|
||||
}
|
||||
|
||||
func (s *baseRpcServer) AddStreamInterceptors(interceptors ...grpc.StreamServerInterceptor) {
|
||||
s.streamInterceptors = append(s.streamInterceptors, interceptors...)
|
||||
}
|
||||
|
||||
func (s *baseRpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
|
||||
s.unaryInterceptors = append(s.unaryInterceptors, interceptors...)
|
||||
}
|
||||
|
||||
func (s *baseRpcServer) SetName(name string) {
|
||||
s.metrics.SetName(name)
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime/debug"
|
||||
|
||||
"zero/core/logx"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func StreamCrashInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) (err error) {
|
||||
defer handleCrash(func(r interface{}) {
|
||||
err = toPanicError(r)
|
||||
})
|
||||
|
||||
return handler(srv, stream)
|
||||
}
|
||||
|
||||
func UnaryCrashInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
defer handleCrash(func(r interface{}) {
|
||||
err = toPanicError(r)
|
||||
})
|
||||
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func handleCrash(handler func(interface{})) {
|
||||
if r := recover(); r != nil {
|
||||
handler(r)
|
||||
}
|
||||
}
|
||||
|
||||
func toPanicError(r interface{}) error {
|
||||
logx.Errorf("%+v %s", r, debug.Stack())
|
||||
return status.Errorf(codes.Internal, "panic: %v", r)
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"zero/core/metric"
|
||||
"zero/core/timex"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const serverNamespace = "rpc_server"
|
||||
|
||||
var (
|
||||
metricServerReqDur = metric.NewHistogramVec(&metric.HistogramVecOpts{
|
||||
Namespace: serverNamespace,
|
||||
Subsystem: "requests",
|
||||
Name: "duration_ms",
|
||||
Help: "rpc server requests duration(ms).",
|
||||
Labels: []string{"method"},
|
||||
Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000},
|
||||
})
|
||||
|
||||
metricServerReqCodeTotal = metric.NewCounterVec(&metric.CounterVecOpts{
|
||||
Namespace: serverNamespace,
|
||||
Subsystem: "requests",
|
||||
Name: "code_total",
|
||||
Help: "rpc server requests code count.",
|
||||
Labels: []string{"method", "code"},
|
||||
})
|
||||
)
|
||||
|
||||
func UnaryPromMetricInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
startTime := timex.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
|
||||
metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,53 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"zero/core/load"
|
||||
"zero/core/stat"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const serviceType = "rpc"
|
||||
|
||||
var (
|
||||
sheddingStat *load.SheddingStat
|
||||
lock sync.Mutex
|
||||
)
|
||||
|
||||
func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
|
||||
ensureSheddingStat()
|
||||
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (val interface{}, err error) {
|
||||
sheddingStat.IncrementTotal()
|
||||
var promise load.Promise
|
||||
promise, err = shedder.Allow()
|
||||
if err != nil {
|
||||
metrics.AddDrop()
|
||||
sheddingStat.IncrementDrop()
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err == context.DeadlineExceeded {
|
||||
promise.Fail()
|
||||
} else {
|
||||
sheddingStat.IncrementPass()
|
||||
promise.Pass()
|
||||
}
|
||||
}()
|
||||
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func ensureSheddingStat() {
|
||||
lock.Lock()
|
||||
if sheddingStat == nil {
|
||||
sheddingStat = load.NewSheddingStat(serviceType)
|
||||
}
|
||||
lock.Unlock()
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"zero/core/logx"
|
||||
"zero/core/stat"
|
||||
"zero/core/timex"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
const serverSlowThreshold = time.Millisecond * 500
|
||||
|
||||
func UnaryStatInterceptor(metrics *stat.Metrics) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
defer handleCrash(func(r interface{}) {
|
||||
err = toPanicError(r)
|
||||
})
|
||||
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
duration := timex.Since(startTime)
|
||||
metrics.Add(stat.Task{
|
||||
Duration: duration,
|
||||
})
|
||||
logDuration(ctx, info.FullMethod, req, duration)
|
||||
}()
|
||||
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func logDuration(ctx context.Context, method string, req interface{}, duration time.Duration) {
|
||||
var addr string
|
||||
client, ok := peer.FromContext(ctx)
|
||||
if ok {
|
||||
addr = client.Addr.String()
|
||||
}
|
||||
content, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
logx.Errorf("%s - %s", addr, err.Error())
|
||||
} else if duration > serverSlowThreshold {
|
||||
logx.WithDuration(duration).Slowf("[RPC] slowcall - %s - %s - %s", addr, method, string(content))
|
||||
} else {
|
||||
logx.WithDuration(duration).Infof("%s - %s - %s", addr, method, string(content))
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"zero/core/contextx"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func UnaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
ctx, cancel := contextx.ShrinkDeadline(ctx, timeout)
|
||||
defer cancel()
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package serverinterceptors
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"zero/core/trace"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func UnaryTracingInterceptor(serviceName string) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
carrier, err := trace.Extract(trace.GrpcFormat, md)
|
||||
if err != nil {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
ctx, span := trace.StartServerSpan(ctx, carrier, serviceName, info.FullMethod)
|
||||
defer span.Finish()
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user