feat: support third party orm to interact with go-zero (#1286)

* fixes #987

* chore: fix test failure

* chore: add comments

* feat: support third party orm to interact with go-zero

* chore: refactor
This commit is contained in:
Kevin Wan
2021-12-01 20:22:15 +08:00
committed by GitHub
parent 543d590710
commit 9d528dddd6
24 changed files with 55 additions and 38 deletions

View File

@@ -0,0 +1,34 @@
package internal
import (
"strings"
"google.golang.org/grpc/resolver"
)
type directBuilder struct{}
func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
var addrs []resolver.Address
endpoints := strings.FieldsFunc(target.Endpoint, func(r rune) bool {
return r == EndpointSepChar
})
for _, val := range subset(endpoints, subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
return nil, err
}
return &nopResolver{cc: cc}, nil
}
func (d *directBuilder) Scheme() string {
return DirectScheme
}

View File

@@ -0,0 +1,53 @@
package internal
import (
"fmt"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/mathx"
"google.golang.org/grpc/resolver"
)
func TestDirectBuilder_Build(t *testing.T) {
tests := []int{
0,
1,
2,
subsetSize / 2,
subsetSize,
subsetSize * 2,
}
for _, test := range tests {
test := test
t.Run(strconv.Itoa(test), func(t *testing.T) {
var servers []string
for i := 0; i < test; i++ {
servers = append(servers, fmt.Sprintf("localhost:%d", i))
}
var b directBuilder
cc := new(mockedClientConn)
_, err := b.Build(resolver.Target{
Scheme: DirectScheme,
Endpoint: strings.Join(servers, ","),
}, cc, resolver.BuildOptions{})
assert.Nil(t, err)
size := mathx.MinInt(test, subsetSize)
assert.Equal(t, size, len(cc.state.Addresses))
m := make(map[string]lang.PlaceholderType)
for _, each := range cc.state.Addresses {
m[each.Addr] = lang.Placeholder
}
assert.Equal(t, size, len(m))
})
}
}
func TestDirectBuilder_Scheme(t *testing.T) {
var b directBuilder
assert.Equal(t, DirectScheme, b.Scheme())
}

View File

@@ -0,0 +1,44 @@
package internal
import (
"strings"
"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/core/logx"
"google.golang.org/grpc/resolver"
)
type discovBuilder struct{}
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
logx.Error(err)
}
}
sub.AddListener(update)
update()
return &nopResolver{cc: cc}, nil
}
func (b *discovBuilder) Scheme() string {
return DiscovScheme
}

View File

@@ -0,0 +1,12 @@
package internal
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDiscovBuilder_Scheme(t *testing.T) {
var b discovBuilder
assert.Equal(t, DiscovScheme, b.Scheme())
}

View File

@@ -0,0 +1,9 @@
package internal
type etcdBuilder struct {
discovBuilder
}
func (b *etcdBuilder) Scheme() string {
return EtcdScheme
}

View File

@@ -0,0 +1,9 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: endpoints-reader
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "watch", "list"]

View File

@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: endpoints-reader
subjects:
- kind: ServiceAccount
name: endpoints-reader
namespace: kevin # the namespace that the ServiceAccount resides in
roleRef:
kind: ClusterRole
name: endpoints-reader
apiGroup: rbac.authorization.k8s.io

View File

@@ -0,0 +1,5 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: endpoints-reader
namespace: kevin # the namespace to create the ServiceAccount

View File

@@ -0,0 +1,139 @@
package kube
import (
"sync"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
v1 "k8s.io/api/core/v1"
)
// EventHandler is ResourceEventHandler implementation.
type EventHandler struct {
update func([]string)
endpoints map[string]lang.PlaceholderType
lock sync.Mutex
}
// NewEventHandler returns an EventHandler.
func NewEventHandler(update func([]string)) *EventHandler {
return &EventHandler{
update: update,
endpoints: make(map[string]lang.PlaceholderType),
}
}
// OnAdd handles the endpoints add events.
func (h *EventHandler) OnAdd(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
return
}
h.lock.Lock()
defer h.lock.Unlock()
var changed bool
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
if _, ok := h.endpoints[point.IP]; !ok {
h.endpoints[point.IP] = lang.Placeholder
changed = true
}
}
}
if changed {
h.notify()
}
}
// OnDelete handles the endpoints delete events.
func (h *EventHandler) OnDelete(obj interface{}) {
endpoints, ok := obj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
return
}
h.lock.Lock()
defer h.lock.Unlock()
var changed bool
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
if _, ok := h.endpoints[point.IP]; ok {
delete(h.endpoints, point.IP)
changed = true
}
}
}
if changed {
h.notify()
}
}
// OnUpdate handles the endpoints update events.
func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) {
oldEndpoints, ok := oldObj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", oldObj)
return
}
newEndpoints, ok := newObj.(*v1.Endpoints)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", newObj)
return
}
if oldEndpoints.ResourceVersion == newEndpoints.ResourceVersion {
return
}
h.Update(newEndpoints)
}
// Update updates the endpoints.
func (h *EventHandler) Update(endpoints *v1.Endpoints) {
h.lock.Lock()
defer h.lock.Unlock()
old := h.endpoints
h.endpoints = make(map[string]lang.PlaceholderType)
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
h.endpoints[point.IP] = lang.Placeholder
}
}
if diff(old, h.endpoints) {
h.notify()
}
}
func (h *EventHandler) notify() {
var targets []string
for k := range h.endpoints {
targets = append(targets, k)
}
h.update(targets)
}
func diff(o, n map[string]lang.PlaceholderType) bool {
if len(o) != len(n) {
return true
}
for k := range o {
if _, ok := n[k]; !ok {
return true
}
}
return false
}

View File

@@ -0,0 +1,276 @@
package kube
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestAdd(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd("bad")
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
{
IP: "0.0.0.3",
},
},
},
}})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2", "0.0.0.3"}, endpoints)
}
func TestDelete(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
{
IP: "0.0.0.3",
},
},
},
}})
h.OnDelete("bad")
h.OnDelete(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
}})
assert.ElementsMatch(t, []string{"0.0.0.3"}, endpoints)
}
func TestUpdate(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
{
IP: "0.0.0.3",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2", "0.0.0.3"}, endpoints)
}
func TestUpdateNoChange(t *testing.T) {
h := NewEventHandler(func(change []string) {
assert.Fail(t, "should not called")
})
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
})
}
func TestUpdateChangeWithDifferentVersion(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.3",
},
},
},
}})
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.3",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2"}, endpoints)
}
func TestUpdateNoChangeWithDifferentVersion(t *testing.T) {
var endpoints []string
h := NewEventHandler(func(change []string) {
endpoints = change
})
h.OnAdd(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
}})
h.OnUpdate("bad", &v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
},
},
}})
h.OnUpdate(&v1.Endpoints{Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
},
},
}}, "bad")
h.OnUpdate(&v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "1",
},
}, &v1.Endpoints{
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "0.0.0.1",
},
{
IP: "0.0.0.2",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
},
})
assert.ElementsMatch(t, []string{"0.0.0.1", "0.0.0.2"}, endpoints)
}

View File

@@ -0,0 +1,47 @@
package kube
import (
"fmt"
"strconv"
"strings"
"google.golang.org/grpc/resolver"
)
const (
colon = ":"
defaultNamespace = "default"
)
var emptyService Service
// Service represents a service with namespace, name and port.
type Service struct {
Namespace string
Name string
Port int
}
// ParseTarget parses the resolver.Target.
func ParseTarget(target resolver.Target) (Service, error) {
var service Service
service.Namespace = target.Authority
if len(service.Namespace) == 0 {
service.Namespace = defaultNamespace
}
segs := strings.SplitN(target.Endpoint, colon, 2)
if len(segs) < 2 {
return emptyService, fmt.Errorf("bad endpoint: %s", target.Endpoint)
}
service.Name = segs[0]
port, err := strconv.Atoi(segs[1])
if err != nil {
return emptyService, err
}
service.Port = port
return service, nil
}

View File

@@ -0,0 +1,83 @@
package kube
import (
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/resolver"
)
func TestParseTarget(t *testing.T) {
tests := []struct {
name string
input resolver.Target
expect Service
hasErr bool
}{
{
name: "normal case",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc:8080",
},
expect: Service{
Namespace: "ns1",
Name: "my-svc",
Port: 8080,
},
},
{
name: "normal case",
input: resolver.Target{
Scheme: "k8s",
Authority: "",
Endpoint: "my-svc:8080",
},
expect: Service{
Namespace: defaultNamespace,
Name: "my-svc",
Port: 8080,
},
},
{
name: "no port",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc:",
},
hasErr: true,
},
{
name: "no port, no colon",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc",
},
hasErr: true,
},
{
name: "bad port",
input: resolver.Target{
Scheme: "k8s",
Authority: "ns1",
Endpoint: "my-svc:800a",
},
hasErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
svc, err := ParseTarget(test.input)
if test.hasErr {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
assert.Equal(t, test.expect, svc)
}
})
}
}

View File

@@ -0,0 +1,80 @@
package internal
import (
"context"
"fmt"
"time"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/threading"
"github.com/tal-tech/go-zero/zrpc/resolver/internal/kube"
"google.golang.org/grpc/resolver"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
const (
resyncInterval = 5 * time.Minute
nameSelector = "metadata.name="
)
type kubeBuilder struct{}
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn,
opts resolver.BuildOptions) (resolver.Resolver, error) {
svc, err := kube.ParseTarget(target)
if err != nil {
return nil, err
}
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
cs, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
handler := kube.NewEventHandler(func(endpoints []string) {
var addrs []resolver.Address
for _, val := range subset(endpoints, subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: fmt.Sprintf("%s:%d", val, svc.Port),
})
}
if err := cc.UpdateState(resolver.State{
Addresses: addrs,
}); err != nil {
logx.Error(err)
}
})
inf := informers.NewSharedInformerFactoryWithOptions(cs, resyncInterval,
informers.WithNamespace(svc.Namespace),
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.FieldSelector = nameSelector + svc.Name
}))
in := inf.Core().V1().Endpoints()
in.Informer().AddEventHandler(handler)
threading.GoSafe(func() {
inf.Start(proc.Done())
})
endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{})
if err != nil {
return nil, err
}
handler.Update(endpoints)
return &nopResolver{cc: cc}, nil
}
func (b *kubeBuilder) Scheme() string {
return KubernetesScheme
}

View File

@@ -0,0 +1,12 @@
package internal
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestKubeBuilder_Scheme(t *testing.T) {
var b kubeBuilder
assert.Equal(t, KubernetesScheme, b.Scheme())
}

View File

@@ -0,0 +1,50 @@
package internal
import (
"fmt"
"google.golang.org/grpc/resolver"
)
const (
// DirectScheme stands for direct scheme.
DirectScheme = "direct"
// DiscovScheme stands for discov scheme.
DiscovScheme = "discov"
// EtcdScheme stands for etcd scheme.
EtcdScheme = "etcd"
// KubernetesScheme stands for k8s scheme.
KubernetesScheme = "k8s"
// EndpointSepChar is the separator cha in endpoints.
EndpointSepChar = ','
subsetSize = 32
)
var (
// EndpointSep is the separator string in endpoints.
EndpointSep = fmt.Sprintf("%c", EndpointSepChar)
directResolverBuilder directBuilder
discovResolverBuilder discovBuilder
etcdResolverBuilder etcdBuilder
k8sResolverBuilder kubeBuilder
)
// RegisterResolver registers the direct and discov schemes to the resolver.
func RegisterResolver() {
resolver.Register(&directResolverBuilder)
resolver.Register(&discovResolverBuilder)
resolver.Register(&etcdResolverBuilder)
resolver.Register(&k8sResolverBuilder)
}
type nopResolver struct {
cc resolver.ClientConn
}
func (r *nopResolver) Close() {
}
func (r *nopResolver) ResolveNow(options resolver.ResolveNowOptions) {
}

View File

@@ -0,0 +1,37 @@
package internal
import (
"testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
func TestNopResolver(t *testing.T) {
// make sure ResolveNow & Close don't panic
var r nopResolver
r.ResolveNow(resolver.ResolveNowOptions{})
r.Close()
}
type mockedClientConn struct {
state resolver.State
}
func (m *mockedClientConn) UpdateState(state resolver.State) error {
m.state = state
return nil
}
func (m *mockedClientConn) ReportError(err error) {
}
func (m *mockedClientConn) NewAddress(addresses []resolver.Address) {
}
func (m *mockedClientConn) NewServiceConfig(serviceConfig string) {
}
func (m *mockedClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return nil
}

View File

@@ -0,0 +1,14 @@
package internal
import "math/rand"
func subset(set []string, sub int) []string {
rand.Shuffle(len(set), func(i, j int) {
set[i], set[j] = set[j], set[i]
})
if len(set) <= sub {
return set
}
return set[:sub]
}

View File

@@ -0,0 +1,54 @@
package internal
import (
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/mathx"
)
func TestSubset(t *testing.T) {
tests := []struct {
name string
set int
sub int
}{
{
name: "more vals to subset",
set: 100,
sub: 36,
},
{
name: "less vals to subset",
set: 100,
sub: 200,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
var vals []string
for i := 0; i < test.set; i++ {
vals = append(vals, strconv.Itoa(i))
}
m := make(map[interface{}]int)
for i := 0; i < 1000; i++ {
set := subset(append([]string(nil), vals...), test.sub)
if test.sub < test.set {
assert.Equal(t, test.sub, len(set))
} else {
assert.Equal(t, test.set, len(set))
}
for _, val := range set {
m[val]++
}
}
assert.True(t, mathx.CalcEntropy(m) > 0.95)
})
}
}

11
zrpc/resolver/register.go Normal file
View File

@@ -0,0 +1,11 @@
package resolver
import (
"github.com/tal-tech/go-zero/zrpc/resolver/internal"
)
// Register registers schemes defined zrpc.
// Keep it in a separated package to let third party register manually.
func Register() {
internal.RegisterResolver()
}

20
zrpc/resolver/target.go Normal file
View File

@@ -0,0 +1,20 @@
package resolver
import (
"fmt"
"strings"
"github.com/tal-tech/go-zero/zrpc/resolver/internal"
)
// BuildDirectTarget returns a string that represents the given endpoints with direct schema.
func BuildDirectTarget(endpoints []string) string {
return fmt.Sprintf("%s:///%s", internal.DirectScheme,
strings.Join(endpoints, internal.EndpointSep))
}
// BuildDiscovTarget returns a string that represents the given endpoints with discov schema.
func BuildDiscovTarget(endpoints []string, key string) string {
return fmt.Sprintf("%s://%s/%s", internal.DiscovScheme,
strings.Join(endpoints, internal.EndpointSep), key)
}

View File

@@ -0,0 +1,17 @@
package resolver
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestBuildDirectTarget(t *testing.T) {
target := BuildDirectTarget([]string{"localhost:123", "localhost:456"})
assert.Equal(t, "direct:///localhost:123,localhost:456", target)
}
func TestBuildDiscovTarget(t *testing.T) {
target := BuildDiscovTarget([]string{"localhost:123", "localhost:456"}, "foo")
assert.Equal(t, "discov://localhost:123,localhost:456/foo", target)
}