From 946dde008f3d66634ab1ae4624a44efb10aa5be6 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 3 Aug 2022 16:46:34 -0700 Subject: [PATCH] xdsclient: NACK endpoint resources with zero weight (#5560) --- internal/testutils/xds/e2e/clientresources.go | 1 + xds/internal/testutils/protos.go | 3 +++ xds/internal/xdsclient/controller/v2_eds_test.go | 6 +++--- .../xdsclient/xdsresource/unmarshal_eds.go | 16 ++++++++++++---- .../xdsclient/xdsresource/unmarshal_eds_test.go | 10 ++++++++++ 5 files changed, 29 insertions(+), 7 deletions(-) diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index f3f7f6307c53..f5363aff91f4 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -370,6 +370,7 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}}, }}, }}, + LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1}, }) } return &v3endpointpb.ClusterLoadAssignment{ diff --git a/xds/internal/testutils/protos.go b/xds/internal/testutils/protos.go index fc3cdf307fcd..b9fbc7448e18 100644 --- a/xds/internal/testutils/protos.go +++ b/xds/internal/testutils/protos.go @@ -118,6 +118,9 @@ func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uin lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]} } } + if lbe.LoadBalancingWeight == nil { + lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 1} + } lbEndPoints = append(lbEndPoints, lbe) } diff --git a/xds/internal/xdsclient/controller/v2_eds_test.go b/xds/internal/xdsclient/controller/v2_eds_test.go index aaa84f9c3d63..665fee92a1ea 100644 --- a/xds/internal/xdsclient/controller/v2_eds_test.go +++ b/xds/internal/xdsclient/controller/v2_eds_test.go @@ -117,7 +117,7 @@ func (s) TestEDSHandleResponse(t *testing.T) { "not-goodEDSName": {Update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: "addr1:314"}}, + Endpoints: []xdsresource.Endpoint{{Address: "addr1:314", Weight: 1}}, ID: internal.LocalityID{SubZone: "locality-1"}, Priority: 0, Weight: 1, @@ -140,13 +140,13 @@ func (s) TestEDSHandleResponse(t *testing.T) { goodEDSName: {Update: xdsresource.EndpointsUpdate{ Localities: []xdsresource.Locality{ { - Endpoints: []xdsresource.Endpoint{{Address: "addr1:314"}}, + Endpoints: []xdsresource.Endpoint{{Address: "addr1:314", Weight: 1}}, ID: internal.LocalityID{SubZone: "locality-1"}, Priority: 1, Weight: 1, }, { - Endpoints: []xdsresource.Endpoint{{Address: "addr2:159"}}, + Endpoints: []xdsresource.Endpoint{{Address: "addr2:159", Weight: 1}}, ID: internal.LocalityID{SubZone: "locality-2"}, Priority: 0, Weight: 1, diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index 7cc12d73d6d5..a2e8fd1c173f 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -90,16 +90,20 @@ func parseDropPolicy(dropPolicy *v3endpointpb.ClusterLoadAssignment_Policy_DropO } } -func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) []Endpoint { +func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) ([]Endpoint, error) { endpoints := make([]Endpoint, 0, len(lbEndpoints)) for _, lbEndpoint := range lbEndpoints { + weight := lbEndpoint.GetLoadBalancingWeight().GetValue() + if weight == 0 { + return nil, fmt.Errorf("EDS response contains an endpoint with zero weight: %+v", lbEndpoint) + } endpoints = append(endpoints, Endpoint{ HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), - Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), + Weight: weight, }) } - return endpoints + return endpoints, nil } func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.PrefixLogger) (EndpointsUpdate, error) { @@ -134,9 +138,13 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.Pr return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority) } localitiesWithPriority[lidStr] = true + endpoints, err := parseEndpoints(locality.GetLbEndpoints()) + if err != nil { + return EndpointsUpdate{}, err + } ret.Localities = append(ret.Localities, Locality{ ID: lid, - Endpoints: parseEndpoints(locality.GetLbEndpoints()), + Endpoints: endpoints, Weight: locality.GetLoadBalancingWeight().GetValue(), Priority: priority, }) diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go index 3ce069c29664..28ceb11c6e16 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go @@ -64,6 +64,16 @@ func (s) TestEDSParseRespProto(t *testing.T) { want: EndpointsUpdate{}, wantErr: true, }, + { + name: "zero-endpoint-weight", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-0", 1, 0, []string{"addr1:314"}, &addLocalityOptions{Weight: []uint32{0}}) + return clab0.Build() + }(), + want: EndpointsUpdate{}, + wantErr: true, + }, { name: "duplicate-locality-in-the-same-priority", m: func() *v3endpointpb.ClusterLoadAssignment {