diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index 8e47e7380f9..79c5c293ee7 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -211,7 +211,11 @@ func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.Scat opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) if err != nil { - return nil, err + header := s.errorHeader(&schedulingpb.Error{ + Type: schedulingpb.ErrorType_UNKNOWN, + Message: err.Error(), + }) + return &schedulingpb.ScatterRegionsResponse{Header: header}, nil } percentage := 100 if len(failures) > 0 { @@ -243,7 +247,7 @@ func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOper if r == nil { header := s.errorHeader(&schedulingpb.Error{ Type: schedulingpb.ErrorType_UNKNOWN, - Message: "Not Found", + Message: "region not found", }) return &schedulingpb.GetOperatorResponse{Header: header}, nil } diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index d6e7441738d..ed0a094dbcf 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -52,6 +52,8 @@ var ( scatterUnnecessaryCounter = scatterCounter.WithLabelValues("unnecessary", "") scatterFailCounter = scatterCounter.WithLabelValues("fail", "") scatterSuccessCounter = scatterCounter.WithLabelValues("success", "") + errRegionNotFound = errors.New("region not found") + errEmptyRegion = errors.New("empty region") ) const ( @@ -184,7 +186,7 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s regions := r.cluster.ScanRegions(startKey, endKey, -1) if len(regions) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, nil, errors.New("empty region") + return 0, nil, errEmptyRegion } failures := make(map[uint64]error, len(regions)) regionMap := make(map[uint64]*core.RegionInfo, len(regions)) @@ -203,7 +205,14 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, retryLimit int) (int, map[uint64]error, error) { if len(regionsID) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, nil, errors.New("empty region") + return 0, nil, errEmptyRegion + } + if len(regionsID) == 1 { + region := r.cluster.GetRegion(regionsID[0]) + if region == nil { + scatterSkipNoRegionCounter.Inc() + return 0, nil, errRegionNotFound + } } failures := make(map[uint64]error, len(regionsID)) regions := make([]*core.RegionInfo, 0, len(regionsID)) @@ -238,7 +247,7 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, failures map[uint64]error, group string, retryLimit int) (int, error) { if len(regions) < 1 { scatterSkipEmptyRegionCounter.Inc() - return 0, errors.New("empty region") + return 0, errEmptyRegion } if retryLimit > maxRetryLimit { retryLimit = maxRetryLimit diff --git a/server/grpc_service.go b/server/grpc_service.go index 2eca668e04d..a23b0c5912b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -985,6 +985,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.SchedulingClient, error) { forwardedHost, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) pre := s.schedulingClient.Load() + // 1. forwardedHost is not empty and pre is empty, update the schedulingClient + // 2. forwardedHost is not empty and forwardedHost is not equal to pre, update the schedulingClient + // 3. forwardedHost is not empty and forwardedHost is equal to pre, return pre + // 4. forwardedHost is empty, return nil if forwardedHost != "" && ((pre == nil) || (pre != nil && forwardedHost != pre.(*schedulingClient).getPrimaryAddr())) { client, err := s.getDelegateClient(ctx, forwardedHost) if err != nil { @@ -996,6 +1000,8 @@ func (s *GrpcServer) updateSchedulingClient(ctx context.Context) (schedulingpb.S } s.schedulingClient.Store(forwardCli) return forwardCli.getClient(), nil + } else if forwardedHost != "" && (pre != nil && forwardedHost == pre.(*schedulingClient).getPrimaryAddr()) { + return pre.(*schedulingClient).getClient(), nil } return nil, ErrNotFoundSchedulingAddr } @@ -1534,8 +1540,13 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest // AskBatchSplit implements gRPC PDServer. func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) { if s.IsAPIServiceMode() { - s.updateSchedulingClient(ctx) - if s.schedulingClient.Load() != nil { + forwardCli, err := s.updateSchedulingClient(ctx) + if err != nil { + return &pdpb.AskBatchSplitResponse{ + Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + if forwardCli != nil { req := &schedulingpb.AskBatchSplitRequest{ Header: &schedulingpb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -1710,7 +1721,14 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg }, nil } if forwardCli != nil { - regionsID := request.GetRegionsId() + var regionsID []uint64 + // nolint + if request.GetRegionId() != 0 { + // nolint + regionsID = []uint64{request.GetRegionId()} + } else { + regionsID = request.GetRegionsId() + } if len(regionsID) == 0 { return &pdpb.ScatterRegionResponse{ Header: s.invalidValue("regions id is required"), @@ -2028,6 +2046,15 @@ func (s *GrpcServer) invalidValue(msg string) *pdpb.ResponseHeader { func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader { switch header.GetError().GetType() { case schedulingpb.ErrorType_UNKNOWN: + if strings.Contains(header.GetError().GetMessage(), "region not found") { + return &pdpb.ResponseHeader{ + ClusterId: header.GetClusterId(), + Error: &pdpb.Error{ + Type: pdpb.ErrorType_REGION_NOT_FOUND, + Message: header.GetError().GetMessage(), + }, + } + } return &pdpb.ResponseHeader{ ClusterId: header.GetClusterId(), Error: &pdpb.Error{ @@ -2393,6 +2420,9 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { + if client == nil { + return nil, nil, errors.New("connection is not set") + } done := make(chan struct{}) ctx, cancel := context.WithCancel(s.ctx) go grpcutil.CheckStream(ctx, cancel, done)