feat: more meaningful error messages, close body on httpc requests (#2238)
* feat: more meaningful error messages, close body on httpc requests * fix: test failure
This commit is contained in:
@@ -27,6 +27,8 @@ type (
|
|||||||
|
|
||||||
// Upstream is the configuration for an upstream.
|
// Upstream is the configuration for an upstream.
|
||||||
Upstream struct {
|
Upstream struct {
|
||||||
|
// Name is the name of the upstream.
|
||||||
|
Name string `json:",optional"`
|
||||||
// Grpc is the target of the upstream.
|
// Grpc is the target of the upstream.
|
||||||
Grpc zrpc.RpcClientConf
|
Grpc zrpc.RpcClientConf
|
||||||
// ProtoSets is the file list of proto set, like [hello.pb].
|
// ProtoSets is the file list of proto set, like [hello.pb].
|
||||||
|
|||||||
@@ -59,6 +59,10 @@ func (s *Server) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) build() error {
|
func (s *Server) build() error {
|
||||||
|
if err := s.ensureUpstreamNames(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return mr.MapReduceVoid(func(source chan<- interface{}) {
|
return mr.MapReduceVoid(func(source chan<- interface{}) {
|
||||||
for _, up := range s.upstreams {
|
for _, up := range s.upstreams {
|
||||||
source <- up
|
source <- up
|
||||||
@@ -68,13 +72,13 @@ func (s *Server) build() error {
|
|||||||
cli := zrpc.MustNewClient(up.Grpc)
|
cli := zrpc.MustNewClient(up.Grpc)
|
||||||
source, err := s.createDescriptorSource(cli, up)
|
source, err := s.createDescriptorSource(cli, up)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel(err)
|
cancel(fmt.Errorf("%s: %w", up.Name, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
methods, err := internal.GetMethods(source)
|
methods, err := internal.GetMethods(source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel(err)
|
cancel(fmt.Errorf("%s: %w", up.Name, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -95,7 +99,7 @@ func (s *Server) build() error {
|
|||||||
}
|
}
|
||||||
for _, m := range up.Mappings {
|
for _, m := range up.Mappings {
|
||||||
if _, ok := methodSet[m.RpcPath]; !ok {
|
if _, ok := methodSet[m.RpcPath]; !ok {
|
||||||
cancel(fmt.Errorf("rpc method %s not found", m.RpcPath))
|
cancel(fmt.Errorf("%s: rpc method %s not found", up.Name, m.RpcPath))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -162,6 +166,19 @@ func (s *Server) createDescriptorSource(cli zrpc.Client, up Upstream) (grpcurl.D
|
|||||||
return source, nil
|
return source, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) ensureUpstreamNames() error {
|
||||||
|
for _, up := range s.upstreams {
|
||||||
|
target, err := up.Grpc.BuildTarget()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
up.Name = target
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) prepareMetadata(header http.Header) []string {
|
func (s *Server) prepareMetadata(header http.Header) []string {
|
||||||
vals := internal.ProcessHeaders(header)
|
vals := internal.ProcessHeaders(header)
|
||||||
if s.processHeader != nil {
|
if s.processHeader != nil {
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ func ParseHeaders(resp *http.Response, val interface{}) error {
|
|||||||
|
|
||||||
// ParseJsonBody parses the response body, which should be in json content type.
|
// ParseJsonBody parses the response body, which should be in json content type.
|
||||||
func ParseJsonBody(resp *http.Response, val interface{}) error {
|
func ParseJsonBody(resp *http.Response, val interface{}) error {
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if withJsonBody(resp) {
|
if withJsonBody(resp) {
|
||||||
return mapping.UnmarshalJsonReader(resp.Body, val)
|
return mapping.UnmarshalJsonReader(resp.Body, val)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,6 @@ func BuildDirectTarget(endpoints []string) string {
|
|||||||
|
|
||||||
// BuildDiscovTarget returns a string that represents the given endpoints with discov schema.
|
// BuildDiscovTarget returns a string that represents the given endpoints with discov schema.
|
||||||
func BuildDiscovTarget(endpoints []string, key string) string {
|
func BuildDiscovTarget(endpoints []string, key string) string {
|
||||||
return fmt.Sprintf("%s://%s/%s", internal.DiscovScheme,
|
return fmt.Sprintf("%s://%s/%s", internal.EtcdScheme,
|
||||||
strings.Join(endpoints, internal.EndpointSep), key)
|
strings.Join(endpoints, internal.EndpointSep), key)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,5 +13,5 @@ func TestBuildDirectTarget(t *testing.T) {
|
|||||||
|
|
||||||
func TestBuildDiscovTarget(t *testing.T) {
|
func TestBuildDiscovTarget(t *testing.T) {
|
||||||
target := BuildDiscovTarget([]string{"localhost:123", "localhost:456"}, "foo")
|
target := BuildDiscovTarget([]string{"localhost:123", "localhost:456"}, "foo")
|
||||||
assert.Equal(t, "discov://localhost:123,localhost:456/foo", target)
|
assert.Equal(t, "etcd://localhost:123,localhost:456/foo", target)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user