feat: restful -> grpc gateway (#2155)
* Revert "chore: remove unimplemented gateway (#2139)"
This reverts commit d70e73ec66.
* feat: working gateway
* feat: use mr to make it faster
* feat: working gateway
* chore: add comments
* feat: support protoset besides reflection
* feat: support zrpc client conf
* docs: update readme
* feat: support grpc-metadata- header to gateway- header conversion
* chore: add docs
This commit is contained in:
35
gateway/config.go
Normal file
35
gateway/config.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/zeromicro/go-zero/rest"
|
||||
"github.com/zeromicro/go-zero/zrpc"
|
||||
)
|
||||
|
||||
type (
|
||||
// GatewayConf is the configuration for gateway.
|
||||
GatewayConf struct {
|
||||
rest.RestConf
|
||||
Upstreams []upstream
|
||||
Timeout time.Duration `json:",default=5s"`
|
||||
}
|
||||
|
||||
// mapping is a mapping between a gateway route and a upstream rpc method.
|
||||
mapping struct {
|
||||
// Method is the HTTP method, like GET, POST, PUT, DELETE.
|
||||
Method string
|
||||
// Path is the HTTP path.
|
||||
Path string
|
||||
// Rpc is the gRPC rpc method, with format of package.service/method
|
||||
Rpc string
|
||||
}
|
||||
// upstream is the configuration for upstream.
|
||||
upstream struct {
|
||||
// Grpc is the target of upstream.
|
||||
Grpc zrpc.RpcClientConf
|
||||
// ProtoSet is the file of proto set, like hello.pb
|
||||
ProtoSet string `json:",optional"`
|
||||
Mapping []mapping
|
||||
}
|
||||
)
|
||||
29
gateway/headerbuilder.go
Normal file
29
gateway/headerbuilder.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
metadataHeaderPrefix = "Grpc-Metadata-"
|
||||
metadataPrefix = "gateway-"
|
||||
)
|
||||
|
||||
func buildHeaders(header http.Header) []string {
|
||||
var headers []string
|
||||
|
||||
for k, v := range header {
|
||||
if !strings.HasPrefix(k, metadataHeaderPrefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s%s", metadataPrefix, strings.TrimPrefix(k, metadataHeaderPrefix))
|
||||
for _, vv := range v {
|
||||
headers = append(headers, key+":"+vv)
|
||||
}
|
||||
}
|
||||
|
||||
return headers
|
||||
}
|
||||
21
gateway/headerbuilder_test.go
Normal file
21
gateway/headerbuilder_test.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestBuildHeadersNoValue(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", nil)
|
||||
req.Header.Add("a", "b")
|
||||
assert.Nil(t, buildHeaders(req.Header))
|
||||
}
|
||||
|
||||
func TestBuildHeadersWithValues(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", nil)
|
||||
req.Header.Add("grpc-metadata-a", "b")
|
||||
req.Header.Add("grpc-metadata-b", "b")
|
||||
assert.EqualValues(t, []string{"gateway-A:b", "gateway-B:b"}, buildHeaders(req.Header))
|
||||
}
|
||||
56
gateway/readme.md
Normal file
56
gateway/readme.md
Normal file
@@ -0,0 +1,56 @@
|
||||
# Gateway
|
||||
|
||||
## Usage
|
||||
|
||||
- main.go
|
||||
|
||||
```go
|
||||
var configFile = flag.String("f", "config.yaml", "config file")
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
var c gateway.GatewayConf
|
||||
conf.MustLoad(*configFile, &c)
|
||||
gw := gateway.MustNewServer(c)
|
||||
defer gw.Stop()
|
||||
gw.Start()
|
||||
}
|
||||
```
|
||||
|
||||
- config.yaml
|
||||
|
||||
```yaml
|
||||
Name: demo-gateway
|
||||
Host: localhost
|
||||
Port: 8888
|
||||
Upstreams:
|
||||
- Grpc:
|
||||
Etcd:
|
||||
Hosts:
|
||||
- localhost:2379
|
||||
Key: hello.rpc
|
||||
# protoset mode
|
||||
ProtoSet: hello.pb
|
||||
Mapping:
|
||||
- Method: get
|
||||
Path: /pingHello/:ping
|
||||
Rpc: hello.Hello/Ping
|
||||
- Grpc:
|
||||
Endpoints:
|
||||
- localhost:8081
|
||||
# reflection mode, no ProtoSet settings
|
||||
Mapping:
|
||||
- Method: post
|
||||
Path: /pingWorld
|
||||
Rpc: world.World/Ping
|
||||
```
|
||||
|
||||
## Generate ProtoSet files
|
||||
|
||||
- example command
|
||||
|
||||
```shell
|
||||
protoc --descriptor_set_out=hello.pb hello.proto
|
||||
```
|
||||
|
||||
43
gateway/requestparser.go
Normal file
43
gateway/requestparser.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/fullstorydev/grpcurl"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/zeromicro/go-zero/rest/pathvar"
|
||||
)
|
||||
|
||||
func newRequestParser(r *http.Request, resolver jsonpb.AnyResolver) (grpcurl.RequestParser, error) {
|
||||
vars := pathvar.Vars(r)
|
||||
if len(vars) == 0 {
|
||||
return grpcurl.NewJSONRequestParser(r.Body, resolver), nil
|
||||
}
|
||||
|
||||
if r.ContentLength == 0 {
|
||||
var buf bytes.Buffer
|
||||
if err := json.NewEncoder(&buf).Encode(vars); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return grpcurl.NewJSONRequestParser(&buf, resolver), nil
|
||||
}
|
||||
|
||||
m := make(map[string]interface{})
|
||||
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for k, v := range vars {
|
||||
m[k] = v
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := json.NewEncoder(&buf).Encode(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return grpcurl.NewJSONRequestParser(&buf, resolver), nil
|
||||
}
|
||||
48
gateway/requestparser_test.go
Normal file
48
gateway/requestparser_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zeromicro/go-zero/rest/pathvar"
|
||||
)
|
||||
|
||||
func TestNewRequestParserNoVar(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", nil)
|
||||
parser, err := newRequestParser(req, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, parser)
|
||||
}
|
||||
|
||||
func TestNewRequestParserWithVars(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", nil)
|
||||
req = pathvar.WithVars(req, map[string]string{"a": "b"})
|
||||
parser, err := newRequestParser(req, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, parser)
|
||||
}
|
||||
|
||||
func TestNewRequestParserNoVarWithBody(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", strings.NewReader(`{"a": "b"}`))
|
||||
parser, err := newRequestParser(req, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, parser)
|
||||
}
|
||||
|
||||
func TestNewRequestParserWithVarsWithBody(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", strings.NewReader(`{"a": "b"}`))
|
||||
req = pathvar.WithVars(req, map[string]string{"c": "d"})
|
||||
parser, err := newRequestParser(req, nil)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, parser)
|
||||
}
|
||||
|
||||
func TestNewRequestParserWithVarsWithWrongBody(t *testing.T) {
|
||||
req := httptest.NewRequest("GET", "/", strings.NewReader(`{"a": "b"`))
|
||||
req = pathvar.WithVars(req, map[string]string{"c": "d"})
|
||||
parser, err := newRequestParser(req, nil)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, parser)
|
||||
}
|
||||
116
gateway/server.go
Normal file
116
gateway/server.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fullstorydev/grpcurl"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/jhump/protoreflect/grpcreflect"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/mr"
|
||||
"github.com/zeromicro/go-zero/rest"
|
||||
"github.com/zeromicro/go-zero/rest/httpx"
|
||||
"github.com/zeromicro/go-zero/zrpc"
|
||||
"google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
|
||||
)
|
||||
|
||||
// Server is a gateway server.
|
||||
type Server struct {
|
||||
svr *rest.Server
|
||||
upstreams []upstream
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// MustNewServer creates a new gateway server.
|
||||
func MustNewServer(c GatewayConf) *Server {
|
||||
return &Server{
|
||||
svr: rest.MustNewServer(c.RestConf),
|
||||
upstreams: c.Upstreams,
|
||||
timeout: c.Timeout,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the gateway server.
|
||||
func (s *Server) Start() {
|
||||
logx.Must(s.build())
|
||||
s.svr.Start()
|
||||
}
|
||||
|
||||
// Stop stops the gateway server.
|
||||
func (s *Server) Stop() {
|
||||
s.svr.Stop()
|
||||
}
|
||||
|
||||
func (s *Server) build() error {
|
||||
return mr.MapReduceVoid(func(source chan<- interface{}) {
|
||||
for _, up := range s.upstreams {
|
||||
source <- up
|
||||
}
|
||||
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
|
||||
up := item.(upstream)
|
||||
cli := zrpc.MustNewClient(up.Grpc)
|
||||
source, err := s.createDescriptorSource(cli, up)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
return
|
||||
}
|
||||
|
||||
resolver := grpcurl.AnyResolverFromDescriptorSource(source)
|
||||
for _, m := range up.Mapping {
|
||||
writer.Write(rest.Route{
|
||||
Method: strings.ToUpper(m.Method),
|
||||
Path: m.Path,
|
||||
Handler: s.buildHandler(source, resolver, cli, m),
|
||||
})
|
||||
}
|
||||
}, func(pipe <-chan interface{}, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
route := item.(rest.Route)
|
||||
s.svr.AddRoute(route)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) buildHandler(source grpcurl.DescriptorSource, resolver jsonpb.AnyResolver,
|
||||
cli zrpc.Client, m mapping) func(http.ResponseWriter, *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
handler := &grpcurl.DefaultEventHandler{
|
||||
Out: w,
|
||||
Formatter: grpcurl.NewJSONFormatter(true,
|
||||
grpcurl.AnyResolverFromDescriptorSource(source)),
|
||||
}
|
||||
parser, err := newRequestParser(r, resolver)
|
||||
if err != nil {
|
||||
httpx.Error(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, can := context.WithTimeout(r.Context(), s.timeout)
|
||||
defer can()
|
||||
if err := grpcurl.InvokeRPC(ctx, source, cli.Conn(), m.Rpc, buildHeaders(r.Header),
|
||||
handler, parser.Next); err != nil {
|
||||
httpx.Error(w, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) createDescriptorSource(cli zrpc.Client, up upstream) (grpcurl.DescriptorSource, error) {
|
||||
var source grpcurl.DescriptorSource
|
||||
var err error
|
||||
|
||||
if len(up.ProtoSet) > 0 {
|
||||
source, err = grpcurl.DescriptorSourceFromProtoSets(up.ProtoSet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
refCli := grpc_reflection_v1alpha.NewServerReflectionClient(cli.Conn())
|
||||
client := grpcreflect.NewClient(context.Background(), refCli)
|
||||
source = grpcurl.DescriptorSourceFromServer(context.Background(), client)
|
||||
}
|
||||
|
||||
return source, nil
|
||||
}
|
||||
Reference in New Issue
Block a user