test: add more tests (#1604)
This commit is contained in:
@@ -12,8 +12,8 @@ var (
|
|||||||
ErrNotFound = mgo.ErrNotFound
|
ErrNotFound = mgo.ErrNotFound
|
||||||
|
|
||||||
// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
|
// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
|
||||||
sharedCalls = syncx.NewSingleFlight()
|
singleFlight = syncx.NewSingleFlight()
|
||||||
stats = cache.NewStat("mongoc")
|
stats = cache.NewStat("mongoc")
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ func TestCollection_Count(t *testing.T) {
|
|||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
defer clean()
|
defer clean()
|
||||||
|
|
||||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
|
||||||
c := newCollection(dummyConn{}, cach)
|
c := newCollection(dummyConn{}, cach)
|
||||||
val, err := c.Count("any")
|
val, err := c.Count("any")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
@@ -98,7 +98,7 @@ func TestStat(t *testing.T) {
|
|||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
defer clean()
|
defer clean()
|
||||||
|
|
||||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
|
||||||
c := newCollection(dummyConn{}, cach).(*cachedCollection)
|
c := newCollection(dummyConn{}, cach).(*cachedCollection)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@@ -121,7 +121,7 @@ func TestStatCacheFails(t *testing.T) {
|
|||||||
defer log.SetOutput(os.Stdout)
|
defer log.SetOutput(os.Stdout)
|
||||||
|
|
||||||
r := redis.New("localhost:59999")
|
r := redis.New("localhost:59999")
|
||||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
|
||||||
c := newCollection(dummyConn{}, cach)
|
c := newCollection(dummyConn{}, cach)
|
||||||
|
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
@@ -142,7 +142,7 @@ func TestStatDbFails(t *testing.T) {
|
|||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
defer clean()
|
defer clean()
|
||||||
|
|
||||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
|
||||||
c := newCollection(dummyConn{}, cach).(*cachedCollection)
|
c := newCollection(dummyConn{}, cach).(*cachedCollection)
|
||||||
|
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
@@ -164,7 +164,7 @@ func TestStatFromMemory(t *testing.T) {
|
|||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
defer clean()
|
defer clean()
|
||||||
|
|
||||||
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
|
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
|
||||||
c := newCollection(dummyConn{}, cach).(*cachedCollection)
|
c := newCollection(dummyConn{}, cach).(*cachedCollection)
|
||||||
|
|
||||||
var all sync.WaitGroup
|
var all sync.WaitGroup
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func MustNewModel(url, collection string, c cache.CacheConf, opts ...cache.Optio
|
|||||||
|
|
||||||
// NewModel returns a Model with a cache cluster.
|
// NewModel returns a Model with a cache cluster.
|
||||||
func NewModel(url, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) {
|
func NewModel(url, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) {
|
||||||
c := cache.New(conf, sharedCalls, stats, mgo.ErrNotFound, opts...)
|
c := cache.New(conf, singleFlight, stats, mgo.ErrNotFound, opts...)
|
||||||
return NewModelWithCache(url, collection, c)
|
return NewModelWithCache(url, collection, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,7 +51,7 @@ func NewModelWithCache(url, collection string, c cache.Cache) (*Model, error) {
|
|||||||
|
|
||||||
// NewNodeModel returns a Model with a cache node.
|
// NewNodeModel returns a Model with a cache node.
|
||||||
func NewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) {
|
func NewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) {
|
||||||
c := cache.NewNode(rds, sharedCalls, stats, mgo.ErrNotFound, opts...)
|
c := cache.NewNode(rds, singleFlight, stats, mgo.ErrNotFound, opts...)
|
||||||
return NewModelWithCache(url, collection, c)
|
return NewModelWithCache(url, collection, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,20 +12,20 @@ import (
|
|||||||
|
|
||||||
// A RpcProxy is a rpc proxy.
|
// A RpcProxy is a rpc proxy.
|
||||||
type RpcProxy struct {
|
type RpcProxy struct {
|
||||||
backend string
|
backend string
|
||||||
clients map[string]Client
|
clients map[string]Client
|
||||||
options []internal.ClientOption
|
options []internal.ClientOption
|
||||||
sharedCalls syncx.SingleFlight
|
singleFlight syncx.SingleFlight
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProxy returns a RpcProxy.
|
// NewProxy returns a RpcProxy.
|
||||||
func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
|
func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
|
||||||
return &RpcProxy{
|
return &RpcProxy{
|
||||||
backend: backend,
|
backend: backend,
|
||||||
clients: make(map[string]Client),
|
clients: make(map[string]Client),
|
||||||
options: opts,
|
options: opts,
|
||||||
sharedCalls: syncx.NewSingleFlight(),
|
singleFlight: syncx.NewSingleFlight(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,7 +33,7 @@ func NewProxy(backend string, opts ...internal.ClientOption) *RpcProxy {
|
|||||||
func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) {
|
func (p *RpcProxy) TakeConn(ctx context.Context) (*grpc.ClientConn, error) {
|
||||||
cred := auth.ParseCredential(ctx)
|
cred := auth.ParseCredential(ctx)
|
||||||
key := cred.App + "/" + cred.Token
|
key := cred.App + "/" + cred.Token
|
||||||
val, err := p.sharedCalls.Do(key, func() (interface{}, error) {
|
val, err := p.singleFlight.Do(key, func() (interface{}, error) {
|
||||||
p.lock.Lock()
|
p.lock.Lock()
|
||||||
client, ok := p.clients[key]
|
client, ok := p.clients[key]
|
||||||
p.lock.Unlock()
|
p.lock.Unlock()
|
||||||
|
|||||||
@@ -64,3 +64,9 @@ func TestProxy(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRpcProxy_TakeConnNewClientFailed(t *testing.T) {
|
||||||
|
proxy := NewProxy("foo", WithDialOption(grpc.WithInsecure()), WithDialOption(grpc.WithBlock()))
|
||||||
|
_, err := proxy.TakeConn(context.Background())
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
}
|
||||||
|
|||||||
@@ -101,6 +101,21 @@ func TestServer_HasEtcd(t *testing.T) {
|
|||||||
srv.Stop()
|
srv.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServer_StartFailed(t *testing.T) {
|
||||||
|
srv := MustNewServer(RpcServerConf{
|
||||||
|
ServiceConf: service.ServiceConf{
|
||||||
|
Log: logx.LogConf{
|
||||||
|
ServiceName: "foo",
|
||||||
|
Mode: "console",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ListenOn: "localhost:aaa",
|
||||||
|
}, func(server *grpc.Server) {
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Panics(t, srv.Start)
|
||||||
|
}
|
||||||
|
|
||||||
type mockedServer struct {
|
type mockedServer struct {
|
||||||
unaryInterceptors []grpc.UnaryServerInterceptor
|
unaryInterceptors []grpc.UnaryServerInterceptor
|
||||||
streamInterceptors []grpc.StreamServerInterceptor
|
streamInterceptors []grpc.StreamServerInterceptor
|
||||||
|
|||||||
Reference in New Issue
Block a user