feat: support customized header to metadata processor (#2162)
* chore: add more tests * feat: support customized header processor
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -11,8 +11,8 @@ const (
|
|||||||
metadataPrefix = "gateway-"
|
metadataPrefix = "gateway-"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BuildHeaders builds the headers for the gateway from HTTP headers.
|
// ProcessHeaders builds the headers for the gateway from HTTP headers.
|
||||||
func BuildHeaders(header http.Header) []string {
|
func ProcessHeaders(header http.Header) []string {
|
||||||
var headers []string
|
var headers []string
|
||||||
|
|
||||||
for k, v := range header {
|
for k, v := range header {
|
||||||
@@ -10,12 +10,12 @@ import (
|
|||||||
func TestBuildHeadersNoValue(t *testing.T) {
|
func TestBuildHeadersNoValue(t *testing.T) {
|
||||||
req := httptest.NewRequest("GET", "/", nil)
|
req := httptest.NewRequest("GET", "/", nil)
|
||||||
req.Header.Add("a", "b")
|
req.Header.Add("a", "b")
|
||||||
assert.Nil(t, BuildHeaders(req.Header))
|
assert.Nil(t, ProcessHeaders(req.Header))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildHeadersWithValues(t *testing.T) {
|
func TestBuildHeadersWithValues(t *testing.T) {
|
||||||
req := httptest.NewRequest("GET", "/", nil)
|
req := httptest.NewRequest("GET", "/", nil)
|
||||||
req.Header.Add("grpc-metadata-a", "b")
|
req.Header.Add("grpc-metadata-a", "b")
|
||||||
req.Header.Add("grpc-metadata-b", "b")
|
req.Header.Add("grpc-metadata-b", "b")
|
||||||
assert.EqualValues(t, []string{"gateway-A:b", "gateway-B:b"}, BuildHeaders(req.Header))
|
assert.ElementsMatch(t, []string{"gateway-A:b", "gateway-B:b"}, ProcessHeaders(req.Header))
|
||||||
}
|
}
|
||||||
@@ -19,20 +19,31 @@ import (
|
|||||||
"google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
|
"google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is a gateway server.
|
type (
|
||||||
type Server struct {
|
// Server is a gateway server.
|
||||||
*rest.Server
|
Server struct {
|
||||||
upstreams []upstream
|
*rest.Server
|
||||||
timeout time.Duration
|
upstreams []upstream
|
||||||
}
|
timeout time.Duration
|
||||||
|
processHeader func(http.Header) []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option defines the method to customize Server.
|
||||||
|
Option func(svr *Server)
|
||||||
|
)
|
||||||
|
|
||||||
// MustNewServer creates a new gateway server.
|
// MustNewServer creates a new gateway server.
|
||||||
func MustNewServer(c GatewayConf) *Server {
|
func MustNewServer(c GatewayConf, opts ...Option) *Server {
|
||||||
return &Server{
|
svr := &Server{
|
||||||
Server: rest.MustNewServer(c.RestConf),
|
Server: rest.MustNewServer(c.RestConf),
|
||||||
upstreams: c.Upstreams,
|
upstreams: c.Upstreams,
|
||||||
timeout: c.Timeout,
|
timeout: c.Timeout,
|
||||||
}
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(svr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return svr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the gateway server.
|
// Start starts the gateway server.
|
||||||
@@ -120,7 +131,7 @@ func (s *Server) buildHandler(source grpcurl.DescriptorSource, resolver jsonpb.A
|
|||||||
defer can()
|
defer can()
|
||||||
|
|
||||||
w.Header().Set(httpx.ContentType, httpx.JsonContentType)
|
w.Header().Set(httpx.ContentType, httpx.JsonContentType)
|
||||||
if err := grpcurl.InvokeRPC(ctx, source, cli.Conn(), rpcPath, internal.BuildHeaders(r.Header),
|
if err := grpcurl.InvokeRPC(ctx, source, cli.Conn(), rpcPath, s.prepareMetadata(r.Header),
|
||||||
handler, parser.Next); err != nil {
|
handler, parser.Next); err != nil {
|
||||||
httpx.Error(w, err)
|
httpx.Error(w, err)
|
||||||
}
|
}
|
||||||
@@ -144,3 +155,20 @@ func (s *Server) createDescriptorSource(cli zrpc.Client, up upstream) (grpcurl.D
|
|||||||
|
|
||||||
return source, nil
|
return source, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) prepareMetadata(header http.Header) []string {
|
||||||
|
vals := internal.ProcessHeaders(header)
|
||||||
|
if s.processHeader != nil {
|
||||||
|
vals = append(vals, s.processHeader(header)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return vals
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithHeaderProcessor sets a processor to process request headers.
|
||||||
|
// The returned headers are used as metadata to invoke the RPC.
|
||||||
|
func WithHeaderProcessor(processHeader func(http.Header) []string) func(*Server) {
|
||||||
|
return func(s *Server) {
|
||||||
|
s.processHeader = processHeader
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user