implement k8s service discovery (#988)

* implement k8s service discovery

* simplify code

* use default namespace if not provided

* disable codecov bot comment

* ignore adhoc dir

* simplify building target in NewClient

* reformat code

* Fix filepath (#990)

* format code, and reorg imports (#991)

* add more unit test

Co-authored-by: anqiansong <anqiansong@gmail.com>
This commit is contained in:
Kevin Wan
2021-09-04 10:27:08 +08:00
committed by GitHub
parent 0325d8e92d
commit 20f665ede8
19 changed files with 979 additions and 41 deletions

View File

@@ -55,13 +55,21 @@ func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
}
opts = append(opts, options...)
var client Client
var target string
var err error
if len(c.Endpoints) > 0 {
client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
} else if err = c.Etcd.Validate(); err == nil {
client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
target = internal.BuildDirectTarget(c.Endpoints)
} else if len(c.Target) > 0 {
target = c.Target
} else {
if err = c.Etcd.Validate(); err != nil {
return nil, err
}
target = internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key)
}
client, err := internal.NewClient(target, opts...)
if err != nil {
return nil, err
}

View File

@@ -23,7 +23,8 @@ type (
// A RpcClientConf is a rpc client config.
RpcClientConf struct {
Etcd discov.EtcdConf `json:",optional"`
Endpoints []string `json:",optional=!Etcd"`
Endpoints []string `json:",optional"`
Target string `json:",optional"`
App string `json:",optional"`
Token string `json:",optional"`
Timeout int64 `json:",default=2000"`

View File

@@ -20,9 +20,11 @@ func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, op
Addr: val,
})
}
cc.UpdateState(resolver.State{
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
})
}); err != nil {
return nil, err
}
return &nopResolver{cc: cc}, nil
}

View File

@@ -4,12 +4,13 @@ import (
"strings"
"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/core/logx"
"google.golang.org/grpc/resolver"
)
type discovBuilder struct{}
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
@@ -26,9 +27,11 @@ func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, op
Addr: val,
})
}
cc.UpdateState(resolver.State{
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
})
}); err != nil {
logx.Error(err)
}
}
sub.AddListener(update)
update()
@@ -36,6 +39,6 @@ func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, op
return &nopResolver{cc: cc}, nil
}
func (d *discovBuilder) Scheme() string {
func (b *discovBuilder) Scheme() string {
return DiscovScheme
}

View File

@@ -0,0 +1,12 @@
package resolver
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDiscovBuilder_Scheme(t *testing.T) {
var b discovBuilder
assert.Equal(t, DiscovScheme, b.Scheme())
}

View File

@@ -0,0 +1,133 @@
package kube
import (
"sync"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
v1 "k8s.io/api/core/v1"
)
type EventHandler struct {
update func([]string)
endpoints map[string]lang.PlaceholderType
lock sync.Mutex
}
func NewEventHandler(update func([]string)) *EventHandler {
return &EventHandler{
update: update,
endpoints: make(map[string]lang.PlaceholderType),
}
}
func (h *EventHandler) OnAdd(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
return
}
h.lock.Lock()
defer h.lock.Unlock()
var changed bool
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
if _, ok := h.endpoints[point.IP]; !ok {
h.endpoints[point.IP] = lang.Placeholder
changed = true
}
}
}
if changed {
h.notify()
}
}
func (h *EventHandler) OnDelete(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
return
}
h.lock.Lock()
defer h.lock.Unlock()
var changed bool
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
if _, ok := h.endpoints[point.IP]; ok {
delete(h.endpoints, point.IP)
changed = true
}
}
}
if changed {
h.notify()
}
}
func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) {
oldEndpoints, ok := oldObj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", oldObj)
return
}
newEndpoints, ok := newObj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", newObj)
return
}
if oldEndpoints.ResourceVersion == newEndpoints.ResourceVersion {
return
}
h.Update(newEndpoints)
}
func (h *EventHandler) Update(endpoints *v1.Endpoints) {
h.lock.Lock()
defer h.lock.Unlock()
old := h.endpoints
h.endpoints = make(map[string]lang.PlaceholderType)
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
h.endpoints[point.IP] = lang.Placeholder
}
}
if diff(old, h.endpoints) {
h.notify()
}
}
func (h *EventHandler) notify() {
var targets []string
for k := range h.endpoints {
targets = append(targets, k)
}
h.update(targets)
}
func diff(o, n map[string]lang.PlaceholderType) bool {
if len(o) != len(n) {
return true
}
for k := range o {
if _, ok := n[k]; !ok {
return true
}
}
return false
}

View File

@@ -0,0 +1,276 @@
package kube
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestAdd(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd("bad")
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
{
IP: "0.0.0.3",
},
},
},
}})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2", "0.0.0.3"}, endpoints)
}
func TestDelete(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
{
IP: "0.0.0.3",
},
},
},
}})
h.OnDelete("bad")
h.OnDelete(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
}})
assert.ElementsMatch(t, []string{"0.0.0.3"}, endpoints)
}
func TestUpdate(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
{
IP: "0.0.0.3",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2", "0.0.0.3"}, endpoints)
}
func TestUpdateNoChange(t *testing.T) {
h := NewEventHandler(func(change []string) {
assert.Fail(t, "should not called")
})
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
})
}
func TestUpdateChangeWithDifferentVersion(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.3",
},
},
},
}})
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.3",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2"}, endpoints)
}
func TestUpdateNoChangeWithDifferentVersion(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
}})
h.OnUpdate("bad", &v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
},
},
}})
h.OnUpdate(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
},
},
}}, "bad")
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2"}, endpoints)
}

View File

@@ -0,0 +1,45 @@
package kube
import (
"fmt"
"strconv"
"strings"
"google.golang.org/grpc/resolver"
)
const (
colon = ":"
defaultNamespace = "default"
)
var emptyService Service
type Service struct {
Namespace string
Name string
Port int
}
func ParseTarget(target resolver.Target) (Service, error) {
var service Service
service.Namespace = target.Authority
if len(service.Namespace) == 0 {
service.Namespace = defaultNamespace
}
segs := strings.SplitN(target.Endpoint, colon, 2)
if len(segs) < 2 {
return emptyService, fmt.Errorf("bad endpoint: %s", target.Endpoint)
}
service.Name = segs[0]
port, err := strconv.Atoi(segs[1])
if err != nil {
return emptyService, err
}
service.Port = port
return service, nil
}

View File

@@ -0,0 +1,83 @@
package kube
import (
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/resolver"
)
func TestParseTarget(t *testing.T) {
tests := []struct {
name string
input resolver.Target
expect Service
hasErr bool
}{
{
name: "normal case",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc:8080",
},
expect: Service{
Namespace: "ns1",
Name: "my-svc",
Port: 8080,
},
},
{
name: "normal case",
input: resolver.Target{
Scheme: "k8s",
Authority: "",
Endpoint: "my-svc:8080",
},
expect: Service{
Namespace: defaultNamespace,
Name: "my-svc",
Port: 8080,
},
},
{
name: "no port",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc:",
},
hasErr: true,
},
{
name: "no port, no colon",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc",
},
hasErr: true,
},
{
name: "bad port",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc:800a",
},
hasErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
svc, err := ParseTarget(test.input)
if test.hasErr {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
assert.Equal(t, test.expect, svc)
}
})
}
}

View File

@@ -0,0 +1,80 @@
package resolver
import (
"context"
"fmt"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/threading"
"github.com/tal-tech/go-zero/zrpc/internal/resolver/kube"
"google.golang.org/grpc/resolver"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
const (
resyncInterval = 5 * time.Minute
nameSelector = "metadata.name="
)
type kubeBuilder struct{}
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn,
opts resolver.BuildOptions) (resolver.Resolver, error) {
svc, err := kube.ParseTarget(target)
if err != nil {
return nil, err
}
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
cs, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
handler := kube.NewEventHandler(func(endpoints []string) {
var addrs []resolver.Address
for _, val := range subset(endpoints, subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: fmt.Sprintf("%s:%d", val, svc.Port),
})
}
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
logx.Error(err)
}
})
inf := informers.NewSharedInformerFactoryWithOptions(cs, resyncInterval,
informers.WithNamespace(svc.Namespace),
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.FieldSelector = nameSelector + svc.Name
}))
in := inf.Core().V1().Endpoints()
in.Informer().AddEventHandler(handler)
threading.GoSafe(func() {
inf.Start(proc.Done())
})
endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{})
if err != nil {
return nil, err
}
handler.Update(endpoints)
return &nopResolver{cc: cc}, nil
}
func (b *kubeBuilder) Scheme() string {
return KubernetesScheme
}

View File

@@ -0,0 +1,12 @@
package resolver
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestKubeBuilder_Scheme(t *testing.T) {
var b kubeBuilder
assert.Equal(t, KubernetesScheme, b.Scheme())
}

View File

@@ -11,6 +11,8 @@ const (
DirectScheme = "direct"
// DiscovScheme stands for discov scheme.
DiscovScheme = "discov"
// KubernetesScheme stands for k8s scheme.
KubernetesScheme = "k8s"
// EndpointSepChar is the separator cha in endpoints.
EndpointSepChar = ','
@@ -23,12 +25,14 @@ var (
dirBuilder directBuilder
disBuilder discovBuilder
k8sBuilder kubeBuilder
)
// RegisterResolver registers the direct and discov schemes to the resolver.
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
resolver.Register(&k8sBuilder)
}
type nopResolver struct {

View File

@@ -14,7 +14,8 @@ const (
)
// NewRpcPubServer returns a Server.
func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string, opts ...ServerOption) (Server, error) {
func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string,
opts ...ServerOption) (Server, error) {
registerEtcd := func() error {
pubListenOn := figureOutListenOn(listenOn)
pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, pubListenOn)