diff --git a/development/mimir-ingest-storage/.data-mimir-write-1/.gitignore b/development/mimir-ingest-storage/.data-mimir-write-zone-a-1/.gitignore similarity index 100% rename from development/mimir-ingest-storage/.data-mimir-write-1/.gitignore rename to development/mimir-ingest-storage/.data-mimir-write-zone-a-1/.gitignore diff --git a/development/mimir-ingest-storage/.data-mimir-write-2/.gitignore b/development/mimir-ingest-storage/.data-mimir-write-zone-a-2/.gitignore similarity index 100% rename from development/mimir-ingest-storage/.data-mimir-write-2/.gitignore rename to development/mimir-ingest-storage/.data-mimir-write-zone-a-2/.gitignore diff --git a/development/mimir-ingest-storage/.data-mimir-write-3/.gitignore b/development/mimir-ingest-storage/.data-mimir-write-zone-a-3/.gitignore similarity index 100% rename from development/mimir-ingest-storage/.data-mimir-write-3/.gitignore rename to development/mimir-ingest-storage/.data-mimir-write-zone-a-3/.gitignore diff --git a/development/mimir-ingest-storage/.data-mimir-write-zone-b-1/.gitignore b/development/mimir-ingest-storage/.data-mimir-write-zone-b-1/.gitignore new file mode 100644 index 00000000000..d6b7ef32c84 --- /dev/null +++ b/development/mimir-ingest-storage/.data-mimir-write-zone-b-1/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/development/mimir-ingest-storage/.data-mimir-write-zone-b-2/.gitignore b/development/mimir-ingest-storage/.data-mimir-write-zone-b-2/.gitignore new file mode 100644 index 00000000000..d6b7ef32c84 --- /dev/null +++ b/development/mimir-ingest-storage/.data-mimir-write-zone-b-2/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/development/mimir-ingest-storage/.data-mimir-write-zone-b-3/.gitignore b/development/mimir-ingest-storage/.data-mimir-write-zone-b-3/.gitignore new file mode 100644 index 00000000000..d6b7ef32c84 --- /dev/null +++ b/development/mimir-ingest-storage/.data-mimir-write-zone-b-3/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/development/mimir-ingest-storage/.dockerignore b/development/mimir-ingest-storage/.dockerignore index cd472131b5b..9bfee85fbfc 100644 --- a/development/mimir-ingest-storage/.dockerignore +++ b/development/mimir-ingest-storage/.dockerignore @@ -1,4 +1,7 @@ -.data-mimir-write-1 -.data-mimir-write-2 -.data-mimir-write-3 +.data-mimir-write-zone-a-1 +.data-mimir-write-zone-b-1 +.data-mimir-write-zone-a-2 +.data-mimir-write-zone-b-2 +.data-mimir-write-zone-a-3 +.data-mimir-write-zone-b-3 .data-minio diff --git a/development/mimir-ingest-storage/compose-up.sh b/development/mimir-ingest-storage/compose-up.sh index 61f2a84bc80..e1200b927b8 100755 --- a/development/mimir-ingest-storage/compose-up.sh +++ b/development/mimir-ingest-storage/compose-up.sh @@ -21,5 +21,5 @@ cd "$SCRIPT_DIR" && make # -gcflags "all=-N -l" disables optimizations that allow for better run with combination with Delve debugger. # GOARCH is not changed. CGO_ENABLED=0 GOOS=linux go build -mod=vendor -gcflags "all=-N -l" -o "${SCRIPT_DIR}"/mimir "${SCRIPT_DIR}"/../../cmd/mimir -docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" mimir-write-1 +docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" mimir-write-zone-a-1 docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml up "$@" diff --git a/development/mimir-ingest-storage/compose-update-mimir.sh b/development/mimir-ingest-storage/compose-update-mimir.sh index 6dbb06a6c10..ca1cd58f627 100755 --- a/development/mimir-ingest-storage/compose-update-mimir.sh +++ b/development/mimir-ingest-storage/compose-update-mimir.sh @@ -22,6 +22,6 @@ cd "$SCRIPT_DIR" && make # GOARCH is not changed. CGO_ENABLED=0 GOOS=linux go build -mod=vendor -gcflags "all=-N -l" -o "${SCRIPT_DIR}"/mimir "${SCRIPT_DIR}"/../../cmd/mimir # Build docker image -docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" mimir-write-1 +docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" mimir-write-zone-a-1 # Recreate Mimir containers docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml up -d --force-recreate "$@" mimir-write-{1..3} mimir-read-{1..2} mimir-backend-{1..2} diff --git a/development/mimir-ingest-storage/config/grafana-agent.yaml b/development/mimir-ingest-storage/config/grafana-agent.yaml index 14c0fc06b44..409c7362067 100644 --- a/development/mimir-ingest-storage/config/grafana-agent.yaml +++ b/development/mimir-ingest-storage/config/grafana-agent.yaml @@ -10,7 +10,13 @@ prometheus: scrape_configs: - job_name: mimir-read-write-mode/mimir-write static_configs: - - targets: ['mimir-write-1:8080', 'mimir-write-2:8080', 'mimir-write-3:8080'] + - targets: + - 'mimir-write-zone-a-1:8080' + - 'mimir-write-zone-b-1:8080' + - 'mimir-write-zone-a-2:8080' + - 'mimir-write-zone-b-2:8080' + - 'mimir-write-zone-a-3:8080' + - 'mimir-write-zone-b-3:8080' labels: cluster: 'docker-compose' namespace: 'mimir-read-write-mode' @@ -31,4 +37,4 @@ prometheus: container: 'mimir-backend' remote_write: - - url: http://mimir-write-1:8080/api/v1/push + - url: http://mimir-write-zone-a-1:8080/api/v1/push diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml index de969237f15..dc2cfc47e40 100644 --- a/development/mimir-ingest-storage/config/mimir.yaml +++ b/development/mimir-ingest-storage/config/mimir.yaml @@ -18,6 +18,11 @@ ingest_storage: ingester: return_only_grpc_errors: true + partition_ring: + min_partition_owners_count: 2 + min_partition_owners_duration: 10s + delete_inactive_partition_after: 1m + blocks_storage: s3: bucket_name: mimir-blocks @@ -43,7 +48,13 @@ blocks_storage: memberlist: # Use write replicas as seed nodes. - join_members: [ mimir-write-1, mimir-write-2, mimir-write-3 ] + join_members: + - mimir-write-zone-a-1 + - mimir-write-zone-b-1 + - mimir-write-zone-a-2 + - mimir-write-zone-b-2 + - mimir-write-zone-a-3 + - mimir-write-zone-b-3 ruler: rule_path: /data/ruler @@ -82,6 +93,7 @@ overrides_exporter: limits: native_histograms_ingestion_enabled: true + ingestion_rate: 100000 runtime_config: file: ./config/runtime.yaml diff --git a/development/mimir-ingest-storage/docker-compose.jsonnet b/development/mimir-ingest-storage/docker-compose.jsonnet index 8814498e39d..0ea0cb34a0b 100644 --- a/development/mimir-ingest-storage/docker-compose.jsonnet +++ b/development/mimir-ingest-storage/docker-compose.jsonnet @@ -14,23 +14,50 @@ std.manifestYamlDoc({ {}, write:: { - 'mimir-write-1': mimirService({ - name: 'mimir-write-1', + // Zone-a. + 'mimir-write-zone-a-1': mimirService({ + name: 'mimir-write-zone-a-1', target: 'write', publishedHttpPort: 8001, - extraVolumes: ['.data-mimir-write-1:/data:delegated'], + extraArguments: ['-ingester.ring.instance-availability-zone=zone-a'], + extraVolumes: ['.data-mimir-write-zone-a-1:/data:delegated'], }), - 'mimir-write-2': mimirService({ - name: 'mimir-write-2', + 'mimir-write-zone-a-2': mimirService({ + name: 'mimir-write-zone-a-2', target: 'write', publishedHttpPort: 8002, - extraVolumes: ['.data-mimir-write-2:/data:delegated'], + extraArguments: ['-ingester.ring.instance-availability-zone=zone-a'], + extraVolumes: ['.data-mimir-write-zone-a-2:/data:delegated'], }), - 'mimir-write-3': mimirService({ - name: 'mimir-write-3', + 'mimir-write-zone-a-3': mimirService({ + name: 'mimir-write-zone-a-3', target: 'write', publishedHttpPort: 8003, - extraVolumes: ['.data-mimir-write-3:/data:delegated'], + extraArguments: ['-ingester.ring.instance-availability-zone=zone-a'], + extraVolumes: ['.data-mimir-write-zone-a-3:/data:delegated'], + }), + + // Zone-b. + 'mimir-write-zone-b-1': mimirService({ + name: 'mimir-write-zone-b-1', + target: 'write', + publishedHttpPort: 8011, + extraArguments: ['-ingester.ring.instance-availability-zone=zone-b'], + extraVolumes: ['.data-mimir-write-zone-b-1:/data:delegated'], + }), + 'mimir-write-zone-b-2': mimirService({ + name: 'mimir-write-zone-b-2', + target: 'write', + publishedHttpPort: 8012, + extraArguments: ['-ingester.ring.instance-availability-zone=zone-b'], + extraVolumes: ['.data-mimir-write-zone-b-2:/data:delegated'], + }), + 'mimir-write-zone-b-3': mimirService({ + name: 'mimir-write-zone-b-3', + target: 'write', + publishedHttpPort: 8013, + extraArguments: ['-ingester.ring.instance-availability-zone=zone-b'], + extraVolumes: ['.data-mimir-write-zone-b-3:/data:delegated'], }), }, @@ -176,6 +203,7 @@ std.manifestYamlDoc({ kafka: { condition: 'service_healthy' }, }, env: {}, + extraArguments: [], extraVolumes: [], memberlistBindPort: self.publishedHttpPort + 2000, }, @@ -192,7 +220,7 @@ std.manifestYamlDoc({ '-config.file=./config/mimir.yaml' % options, '-target=%(target)s' % options, '-activity-tracker.filepath=/activity/%(name)s' % options, - ], + ] + options.extraArguments, environment: [ '%s=%s' % [key, options.env[key]] for key in std.objectFields(options.env) diff --git a/development/mimir-ingest-storage/docker-compose.yml b/development/mimir-ingest-storage/docker-compose.yml index ae4e49a8a41..ec4022ba4e3 100644 --- a/development/mimir-ingest-storage/docker-compose.yml +++ b/development/mimir-ingest-storage/docker-compose.yml @@ -130,7 +130,7 @@ "volumes": - "./config:/mimir/config" - "./activity:/activity" - "mimir-write-1": + "mimir-write-zone-a-1": "build": "context": "." "dockerfile": "dev.dockerfile" @@ -138,22 +138,23 @@ - "./mimir" - "-config.file=./config/mimir.yaml" - "-target=write" - - "-activity-tracker.filepath=/activity/mimir-write-1" + - "-activity-tracker.filepath=/activity/mimir-write-zone-a-1" + - "-ingester.ring.instance-availability-zone=zone-a" "depends_on": "kafka": "condition": "service_healthy" "minio": "condition": "service_started" "environment": [] - "hostname": "mimir-write-1" + "hostname": "mimir-write-zone-a-1" "image": "mimir" "ports": - "8001:8080" "volumes": - "./config:/mimir/config" - "./activity:/activity" - - ".data-mimir-write-1:/data:delegated" - "mimir-write-2": + - ".data-mimir-write-zone-a-1:/data:delegated" + "mimir-write-zone-a-2": "build": "context": "." "dockerfile": "dev.dockerfile" @@ -161,22 +162,23 @@ - "./mimir" - "-config.file=./config/mimir.yaml" - "-target=write" - - "-activity-tracker.filepath=/activity/mimir-write-2" + - "-activity-tracker.filepath=/activity/mimir-write-zone-a-2" + - "-ingester.ring.instance-availability-zone=zone-a" "depends_on": "kafka": "condition": "service_healthy" "minio": "condition": "service_started" "environment": [] - "hostname": "mimir-write-2" + "hostname": "mimir-write-zone-a-2" "image": "mimir" "ports": - "8002:8080" "volumes": - "./config:/mimir/config" - "./activity:/activity" - - ".data-mimir-write-2:/data:delegated" - "mimir-write-3": + - ".data-mimir-write-zone-a-2:/data:delegated" + "mimir-write-zone-a-3": "build": "context": "." "dockerfile": "dev.dockerfile" @@ -184,21 +186,94 @@ - "./mimir" - "-config.file=./config/mimir.yaml" - "-target=write" - - "-activity-tracker.filepath=/activity/mimir-write-3" + - "-activity-tracker.filepath=/activity/mimir-write-zone-a-3" + - "-ingester.ring.instance-availability-zone=zone-a" "depends_on": "kafka": "condition": "service_healthy" "minio": "condition": "service_started" "environment": [] - "hostname": "mimir-write-3" + "hostname": "mimir-write-zone-a-3" "image": "mimir" "ports": - "8003:8080" "volumes": - "./config:/mimir/config" - "./activity:/activity" - - ".data-mimir-write-3:/data:delegated" + - ".data-mimir-write-zone-a-3:/data:delegated" + "mimir-write-zone-b-1": + "build": + "context": "." + "dockerfile": "dev.dockerfile" + "command": + - "./mimir" + - "-config.file=./config/mimir.yaml" + - "-target=write" + - "-activity-tracker.filepath=/activity/mimir-write-zone-b-1" + - "-ingester.ring.instance-availability-zone=zone-b" + "depends_on": + "kafka": + "condition": "service_healthy" + "minio": + "condition": "service_started" + "environment": [] + "hostname": "mimir-write-zone-b-1" + "image": "mimir" + "ports": + - "8011:8080" + "volumes": + - "./config:/mimir/config" + - "./activity:/activity" + - ".data-mimir-write-zone-b-1:/data:delegated" + "mimir-write-zone-b-2": + "build": + "context": "." + "dockerfile": "dev.dockerfile" + "command": + - "./mimir" + - "-config.file=./config/mimir.yaml" + - "-target=write" + - "-activity-tracker.filepath=/activity/mimir-write-zone-b-2" + - "-ingester.ring.instance-availability-zone=zone-b" + "depends_on": + "kafka": + "condition": "service_healthy" + "minio": + "condition": "service_started" + "environment": [] + "hostname": "mimir-write-zone-b-2" + "image": "mimir" + "ports": + - "8012:8080" + "volumes": + - "./config:/mimir/config" + - "./activity:/activity" + - ".data-mimir-write-zone-b-2:/data:delegated" + "mimir-write-zone-b-3": + "build": + "context": "." + "dockerfile": "dev.dockerfile" + "command": + - "./mimir" + - "-config.file=./config/mimir.yaml" + - "-target=write" + - "-activity-tracker.filepath=/activity/mimir-write-zone-b-3" + - "-ingester.ring.instance-availability-zone=zone-b" + "depends_on": + "kafka": + "condition": "service_healthy" + "minio": + "condition": "service_started" + "environment": [] + "hostname": "mimir-write-zone-b-3" + "image": "mimir" + "ports": + - "8013:8080" + "volumes": + - "./config:/mimir/config" + - "./activity:/activity" + - ".data-mimir-write-zone-b-3:/data:delegated" "minio": "command": - "server" diff --git a/go.mod b/go.mod index fdd1f635ad2..7fab0cffacc 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20240208074945-f245b483eb15 + github.com/grafana/dskit v0.0.0-20240213103939-80881aa4a62f github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 0947ca19627..9155f02bbe3 100644 --- a/go.sum +++ b/go.sum @@ -495,8 +495,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4= github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24= github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= -github.com/grafana/dskit v0.0.0-20240208074945-f245b483eb15 h1:+oY4HiyUyGFY/DrLDBc7PcRwC1dIztT899JUPNxGVGE= -github.com/grafana/dskit v0.0.0-20240208074945-f245b483eb15/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM= +github.com/grafana/dskit v0.0.0-20240213103939-80881aa4a62f h1:SU2XpJOzuclXYls6LyMgmQhv2TVLe7Oj6UbezYxZeM0= +github.com/grafana/dskit v0.0.0-20240213103939-80881aa4a62f/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM= diff --git a/integration/configs.go b/integration/configs.go index e5aba64aba2..ed5fdf46e08 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -247,6 +247,10 @@ blocks_storage: // and faster integration tests. "-ingest-storage.kafka.last-produced-offset-poll-interval": "50ms", "-ingest-storage.kafka.last-produced-offset-retry-timeout": "1s", + + // Do not wait before switching an INACTIVE partition to ACTIVE. + "-ingester.partition-ring.min-partition-owners-count": "0", + "-ingester.partition-ring.min-partition-owners-duration": "0s", } } ) diff --git a/pkg/api/api.go b/pkg/api/api.go index 399d9a635c4..d814cf67a16 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -280,6 +280,7 @@ type Ingester interface { FlushHandler(http.ResponseWriter, *http.Request) ShutdownHandler(http.ResponseWriter, *http.Request) PrepareShutdownHandler(http.ResponseWriter, *http.Request) + PreparePartitionDownscaleHandler(http.ResponseWriter, *http.Request) PushWithCleanup(context.Context, *mimirpb.WriteRequest, func()) error UserRegistryHandler(http.ResponseWriter, *http.Request) TenantsHandler(http.ResponseWriter, *http.Request) @@ -297,6 +298,7 @@ func (a *API) RegisterIngester(i Ingester) { a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, true, "GET", "POST") a.RegisterRoute("/ingester/prepare-shutdown", http.HandlerFunc(i.PrepareShutdownHandler), false, true, "GET", "POST", "DELETE") + a.RegisterRoute("/ingester/prepare-partition-downscale", http.HandlerFunc(i.PreparePartitionDownscaleHandler), false, true, "GET", "POST", "DELETE") a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, true, "GET", "POST") a.RegisterRoute("/ingester/tsdb_metrics", http.HandlerFunc(i.UserRegistryHandler), true, true, "GET") @@ -344,14 +346,22 @@ func (a *API) RegisterRulerAPI(r *ruler.API, configAPIEnabled bool, buildInfoHan } } -// RegisterRing registers the ring UI page associated with the distributor for writes. -func (a *API) RegisterRing(r http.Handler) { +// RegisterIngesterRing registers the ring UI page associated with the ingesters ring. +func (a *API) RegisterIngesterRing(r http.Handler) { a.indexPage.AddLinks(defaultWeight, "Ingester", []IndexPageLink{ {Desc: "Ring status", Path: "/ingester/ring"}, }) a.RegisterRoute("/ingester/ring", r, false, true, "GET", "POST") } +// RegisterIngesterPartitionRing registers the ring UI page associated with the ingester partitions ring. +func (a *API) RegisterIngesterPartitionRing(r http.Handler) { + a.indexPage.AddLinks(defaultWeight, "Ingester", []IndexPageLink{ + {Desc: "Partition ring status", Path: "/ingester/partition-ring"}, + }) + a.RegisterRoute("/ingester/partition-ring", r, false, true, "GET", "POST") +} + // RegisterStoreGateway registers the ring UI page associated with the store-gateway. func (a *API) RegisterStoreGateway(s *storegateway.StoreGateway) { storegatewaypb.RegisterStoreGatewayServer(a.server.GRPC, s) diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 8bb41337366..4677d0ee0db 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -76,6 +76,18 @@ func (pc *IndexPageContent) AddLinks(weight int, groupDesc string, links []Index pc.mu.Lock() defer pc.mu.Unlock() + // Append the links to the group if already existing. + for i, group := range pc.elements { + if group.Desc != groupDesc { + continue + } + + group.Links = append(group.Links, links...) + pc.elements[i] = group + return + } + + // The group hasn't been found. We create a new one. pc.elements = append(pc.elements, IndexPageLinkGroup{weight: weight, Desc: groupDesc, Links: links}) } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 644de6453c6..55145e6e619 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1458,7 +1458,8 @@ func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.Instance // sendToStorage sends received data to the object storage, computing the partition based on the input ingester. // This function is used when ingest storage is enabled. func (d *Distributor) sendToStorage(ctx context.Context, userID string, ingester ring.InstanceDesc, req *mimirpb.WriteRequest) error { - partitionID, err := ingest.IngesterPartition(ingester.Id) + //nolint:staticcheck + partitionID, err := ingest.IngesterZonalPartition(ingester.Id) if err != nil { return err } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f08aec4fd88..8ba9d762c9b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" @@ -87,6 +88,10 @@ const ( // IngesterRingKey is the key under which we store the ingesters ring in the KVStore. IngesterRingKey = "ring" + // PartitionRingKey is the key under which we store the partitions ring used by the "ingest storage". + PartitionRingKey = "ingester-partitions" + PartitionRingName = "ingester-partitions" + // Jitter applied to the idle timeout to prevent compaction in all ingesters concurrently. compactionIdleTimeoutJitter = 0.25 @@ -147,7 +152,8 @@ type requestWithUsersAndCallback struct { // Config for an Ingester. type Config struct { - IngesterRing RingConfig `yaml:"ring"` + IngesterRing RingConfig `yaml:"ring"` + IngesterPartitionRing PartitionRingConfig `yaml:"partition_ring" category:"experimental" doc:"hidden"` // Config for metadata purging. MetadataRetainPeriod time.Duration `yaml:"metadata_retain_period" category:"advanced"` @@ -189,6 +195,7 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { cfg.IngesterRing.RegisterFlags(f, logger) + cfg.IngesterPartitionRing.RegisterFlags(f) cfg.DefaultLimits.RegisterFlags(f) cfg.ActiveSeriesMetrics.RegisterFlags(f) @@ -313,7 +320,10 @@ type Ingester struct { errorSamplers ingesterErrSamplers - ingestReader *ingest.PartitionReader + // The following is used by ingest storage (when enabled). + ingestReader *ingest.PartitionReader + ingestPartitionID int32 + ingestPartitionLifecycler *ring.PartitionInstanceLifecycler } func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { @@ -418,14 +428,37 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, if ingestCfg := cfg.IngestStorageConfig; ingestCfg.Enabled { kafkaCfg := ingestCfg.KafkaConfig - partitionID, err := ingest.IngesterPartition(cfg.IngesterRing.InstanceID) + //nolint:staticcheck + legacyPartitionID, err := ingest.IngesterZonalPartition(cfg.IngesterRing.InstanceID) if err != nil { - return nil, errors.Wrap(err, "calculating ingest storage partition ID") + return nil, errors.Wrap(err, "calculating ingester legacy partition ID") } - i.ingestReader, err = ingest.NewPartitionReaderForPusher(kafkaCfg, partitionID, i, log.With(logger, "component", "ingest_reader"), registerer) + + i.ingestReader, err = ingest.NewPartitionReaderForPusher(kafkaCfg, legacyPartitionID, i, log.With(logger, "component", "ingest_reader"), registerer) if err != nil { return nil, errors.Wrap(err, "creating ingest storage reader") } + + i.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.IngesterRing.InstanceID) + if err != nil { + return nil, errors.Wrap(err, "calculating ingester partition ID") + } + + partitionRingKV := cfg.IngesterPartitionRing.kvMock + if partitionRingKV == nil { + partitionRingKV, err = kv.NewClient(cfg.IngesterRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger) + if err != nil { + return nil, errors.Wrap(err, "creating KV store for ingester partition ring") + } + } + + i.ingestPartitionLifecycler = ring.NewPartitionInstanceLifecycler( + i.cfg.IngesterPartitionRing.ToLifecyclerConfig(i.ingestPartitionID, cfg.IngesterRing.InstanceID), + PartitionRingName, + PartitionRingKey, + partitionRingKV, + logger, + prometheus.WrapRegistererWithPrefix("cortex_", registerer)) } i.BasicService = services.NewBasicService(i.starting, i.updateLoop, i.stopping) @@ -470,6 +503,16 @@ func (i *Ingester) starting(ctx context.Context) (err error) { _ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler) } }() + + // First of all we have to check if the shutdown marker is set. This needs to be done + // as first thing because, if found, it may change the behaviour of the ingester startup. + if exists, err := shutdownmarker.Exists(shutdownmarker.GetPath(i.cfg.BlocksStorageConfig.TSDB.Dir)); err != nil { + return errors.Wrap(err, "failed to check ingester shutdown marker") + } else if exists { + level.Info(i.logger).Log("msg", "detected existing shutdown marker, setting unregister and flush on shutdown", "path", shutdownmarker.GetPath(i.cfg.BlocksStorageConfig.TSDB.Dir)) + i.setPrepareShutdown() + } + if err := i.openExistingTSDB(ctx); err != nil { // Try to rollback and close opened TSDBs before halting the ingester. i.closeAllTSDB() @@ -542,15 +585,8 @@ func (i *Ingester) starting(ctx context.Context) (err error) { servs = append(servs, i.ingestReader) } - shutdownMarkerPath := shutdownmarker.GetPath(i.cfg.BlocksStorageConfig.TSDB.Dir) - shutdownMarkerFound, err := shutdownmarker.Exists(shutdownMarkerPath) - if err != nil { - return errors.Wrap(err, "failed to check ingester shutdown marker") - } - - if shutdownMarkerFound { - level.Info(i.logger).Log("msg", "detected existing shutdown marker, setting unregister and flush on shutdown", "path", shutdownMarkerPath) - i.setPrepareShutdown() + if i.ingestPartitionLifecycler != nil { + servs = append(servs, i.ingestPartitionLifecycler) } i.subservices, err = services.NewManager(servs...) @@ -3265,6 +3301,13 @@ func (i *Ingester) PrepareShutdownHandler(w http.ResponseWriter, r *http.Request w.WriteHeader(http.StatusNoContent) case http.MethodDelete: + // Reverting the prepared shutdown is currently not supported by the ingest storage. + if i.cfg.IngestStorageConfig.Enabled { + level.Error(i.logger).Log("msg", "the ingest storage doesn't support reverting the prepared shutdown") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + if err := shutdownmarker.Remove(shutdownMarkerPath); err != nil { level.Error(i.logger).Log("msg", "unable to remove prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) w.WriteHeader(http.StatusInternalServerError) @@ -3285,6 +3328,18 @@ func (i *Ingester) setPrepareShutdown() { i.lifecycler.SetUnregisterOnShutdown(true) i.lifecycler.SetFlushOnShutdown(true) i.metrics.shutdownMarker.Set(1) + + if i.ingestPartitionLifecycler != nil { + // When the prepare shutdown endpoint is called there are two changes in the partitions ring behavior: + // + // 1. If setPrepareShutdown() is called at startup, because of the shutdown marker found on disk, + // the ingester shouldn't create the partition if doesn't exist, because we expect the ingester will + // be scaled down shortly after. + // 2. When the ingester will shutdown we'll have to remove the ingester from the partition owners, + // because we expect the ingester to be scaled down. + i.ingestPartitionLifecycler.SetCreatePartitionOnStartup(false) + i.ingestPartitionLifecycler.SetRemoveOwnerOnShutdown(true) + } } func (i *Ingester) unsetPrepareShutdown() { @@ -3293,6 +3348,88 @@ func (i *Ingester) unsetPrepareShutdown() { i.metrics.shutdownMarker.Set(0) } +// PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the +// ingester will switch to INACTIVE state (read-only). +// +// Following methods are supported: +// +// - GET +// Returns timestamp when partition was switched to INACTIVE state, or 0, if partition is not in INACTIVE state. +// +// - POST +// Switches the partition to INACTIVE state (if not yet), and returns the timestamp when the switch to +// INACTIVE state happened. +// +// - DELETE +// Sets partition back to ACTIVE state. +func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) { + logger := log.With(i.logger, "partition", i.ingestPartitionID) + + // Don't allow callers to change the shutdown configuration while we're in the middle + // of starting or shutting down. + if i.State() != services.Running { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + if !i.cfg.IngestStorageConfig.Enabled { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + switch r.Method { + case http.MethodPost: + // It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale + // will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we + // don't know to which state reverting back. Given a partition is expected to stay in PENDING state + // for a short period, we simply don't allow this case. + state, _, err := i.ingestPartitionLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if state == ring.PartitionPending { + level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state") + w.WriteHeader(http.StatusConflict) + return + } + + if err := i.ingestPartitionLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + case http.MethodDelete: + // We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency + // in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried. + // Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design. + // We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than + // "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer + // than "lookback period" ago, it looks to be an edge case not worth to address. + if err := i.ingestPartitionLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil { + level.Error(logger).Log("msg", "failed to change partition state to active", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + state, stateTimestamp, err := i.ingestPartitionLifecycler.GetPartitionState(r.Context()) + if err != nil { + level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if state == ring.PartitionInactive { + util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix()}) + } else { + util.WriteJSONResponse(w, map[string]any{"timestamp": 0}) + } +} + // ShutdownHandler triggers the following set of operations in order: // - Change the state of ring to stop accepting writes. // - Flush all the chunks. diff --git a/pkg/ingester/ingester_activity.go b/pkg/ingester/ingester_activity.go index 3d03fb7a45a..cfb0f816d6d 100644 --- a/pkg/ingester/ingester_activity.go +++ b/pkg/ingester/ingester_activity.go @@ -157,6 +157,15 @@ func (i *ActivityTrackerWrapper) PrepareShutdownHandler(w http.ResponseWriter, r i.ing.PrepareShutdownHandler(w, r) } +func (i *ActivityTrackerWrapper) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) { + ix := i.tracker.Insert(func() string { + return requestActivity(r.Context(), "Ingester/PreparePartitionDownscaleHandler()", nil) + }) + defer i.tracker.Delete(ix) + + i.ing.PreparePartitionDownscaleHandler(w, r) +} + func (i *ActivityTrackerWrapper) ShutdownHandler(w http.ResponseWriter, r *http.Request) { ix := i.tracker.Insert(func() string { return requestActivity(r.Context(), "Ingester/ShutdownHandler", nil) diff --git a/pkg/ingester/ingester_ingest_storage_test.go b/pkg/ingester/ingester_ingest_storage_test.go index ec0c4e4655e..6856ce660c1 100644 --- a/pkg/ingester/ingester_ingest_storage_test.go +++ b/pkg/ingester/ingester_ingest_storage_test.go @@ -3,18 +3,26 @@ package ingester import ( + "bytes" "context" "errors" "fmt" + "net/http" + "net/http/httptest" + "os" + "slices" "sync" "testing" "time" "github.com/go-kit/log" + "github.com/grafana/dskit/kv/consul" + "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" @@ -26,6 +34,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/storage/ingest" + "github.com/grafana/mimir/pkg/util/shutdownmarker" util_test "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/testkafka" "github.com/grafana/mimir/pkg/util/validation" @@ -91,7 +100,8 @@ func TestIngester_QueryStream_IngestStorageReadConsistency(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(ctx, writer)) }) - partitionID, err := ingest.IngesterPartition(cfg.IngesterRing.InstanceID) + //nolint:staticcheck + partitionID, err := ingest.IngesterZonalPartition(cfg.IngesterRing.InstanceID) require.NoError(t, err) require.NoError(t, writer.WriteSync(ctx, partitionID, userID, &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{series1}, Source: mimirpb.API})) @@ -135,6 +145,213 @@ func TestIngester_QueryStream_IngestStorageReadConsistency(t *testing.T) { } } +func TestIngester_PrepareShutdownHandler_IngestStorageSupport(t *testing.T) { + ctx := context.Background() + + reg := prometheus.NewPedanticRegistry() + overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + // Start ingester. + cfg := defaultIngesterTestConfig(t) + ingester, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, reg) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, ingester)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester)) + }) + + // Start a watcher used to assert on the partitions ring. + watcher := ring.NewPartitionRingWatcher(PartitionRingName, PartitionRingKey, cfg.IngesterPartitionRing.kvMock, log.NewNopLogger(), nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, watcher)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, watcher)) + }) + + // Wait until it's healthy + test.Poll(t, 1*time.Second, 1, func() interface{} { + return ingester.lifecycler.HealthyInstancesCount() + }) + + t.Run("should not allow to cancel the prepare shutdown, because unsupported by the ingest storage", func(t *testing.T) { + res := httptest.NewRecorder() + ingester.PrepareShutdownHandler(res, httptest.NewRequest(http.MethodDelete, "/ingester/prepare-shutdown", nil)) + require.Equal(t, http.StatusMethodNotAllowed, res.Code) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_prepare_shutdown_requested If the ingester has been requested to prepare for shutdown via endpoint or marker file. + # TYPE cortex_ingester_prepare_shutdown_requested gauge + cortex_ingester_prepare_shutdown_requested 0 + `), "cortex_ingester_prepare_shutdown_requested")) + }) + + t.Run("should remove the ingester from partition owners on a prepared shutdown", func(t *testing.T) { + res := httptest.NewRecorder() + ingester.PrepareShutdownHandler(res, httptest.NewRequest(http.MethodPost, "/ingester/prepare-shutdown", nil)) + require.Equal(t, 204, res.Code) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_prepare_shutdown_requested If the ingester has been requested to prepare for shutdown via endpoint or marker file. + # TYPE cortex_ingester_prepare_shutdown_requested gauge + cortex_ingester_prepare_shutdown_requested 1 + `), "cortex_ingester_prepare_shutdown_requested")) + + // Pre-condition: the ingester should be registered as owner in the ring. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().PartitionOwnerIDs(0), []string{"ingester-zone-a-0"}) + }, time.Second, 10*time.Millisecond) + + // Shutdown ingester. + require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester)) + + // We expect the ingester to be removed from partition owners. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().PartitionOwnerIDs(0), []string{}) + }, time.Second, 10*time.Millisecond) + }) +} + +func TestIngester_PreparePartitionDownscaleHandler(t *testing.T) { + ctx := context.Background() + + overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + setup := func(t *testing.T, cfg Config) (*Ingester, *ring.PartitionRingWatcher) { + // Start ingester. + ingester, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, prometheus.NewPedanticRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, ingester)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester)) + }) + + // Start a watcher used to assert on the partitions ring. + watcher := ring.NewPartitionRingWatcher(PartitionRingName, PartitionRingKey, cfg.IngesterPartitionRing.kvMock, log.NewNopLogger(), nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, watcher)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, watcher)) + }) + + // Wait until it's healthy + test.Poll(t, 1*time.Second, 1, func() interface{} { + return ingester.lifecycler.HealthyInstancesCount() + }) + + return ingester, watcher + } + + t.Run("POST request should switch the partition state to INACTIVE", func(t *testing.T) { + t.Parallel() + + ingester, watcher := setup(t, defaultIngesterTestConfig(t)) + + // Pre-condition: the partition is ACTIVE. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().ActivePartitionIDs(), []int32{0}) + }, time.Second, 10*time.Millisecond) + + res := httptest.NewRecorder() + ingester.PreparePartitionDownscaleHandler(res, httptest.NewRequest(http.MethodPost, "/ingester/prepare-partition-downscale", nil)) + require.Equal(t, http.StatusOK, res.Code) + + // We expect the partition to switch to INACTIVE. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().InactivePartitionIDs(), []int32{0}) + }, time.Second, 10*time.Millisecond) + }) + + t.Run("DELETE request after a POST request should switch the partition back to ACTIVE state", func(t *testing.T) { + t.Parallel() + + ingester, watcher := setup(t, defaultIngesterTestConfig(t)) + + // Pre-condition: the partition is ACTIVE. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().ActivePartitionIDs(), []int32{0}) + }, time.Second, 10*time.Millisecond) + + res := httptest.NewRecorder() + ingester.PreparePartitionDownscaleHandler(res, httptest.NewRequest(http.MethodPost, "/ingester/prepare-partition-downscale", nil)) + require.Equal(t, http.StatusOK, res.Code) + + // We expect the partition to switch to INACTIVE. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().InactivePartitionIDs(), []int32{0}) + }, time.Second, 10*time.Millisecond) + + res = httptest.NewRecorder() + ingester.PreparePartitionDownscaleHandler(res, httptest.NewRequest(http.MethodDelete, "/ingester/prepare-partition-downscale", nil)) + require.Equal(t, http.StatusOK, res.Code) + + // We expect the partition to switch to ACTIVE. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().ActivePartitionIDs(), []int32{0}) + }, time.Second, 10*time.Millisecond) + }) + + t.Run("POST request should be rejected if the partition is in PENDING state", func(t *testing.T) { + t.Parallel() + + // To keep the partition in PENDING state we set a minimum number of owners + // higher than the actual number of ingesters we're going to run. + cfg := defaultIngesterTestConfig(t) + cfg.IngesterPartitionRing.MinOwnersCount = 2 + + ingester, watcher := setup(t, cfg) + + // Pre-condition: the partition is PENDING. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().PendingPartitionIDs(), []int32{0}) + }, time.Second, 10*time.Millisecond) + + res := httptest.NewRecorder() + ingester.PreparePartitionDownscaleHandler(res, httptest.NewRequest(http.MethodPost, "/ingester/prepare-partition-downscale", nil)) + require.Equal(t, http.StatusConflict, res.Code) + + // We expect the partition to be in PENDING state. + require.Eventually(t, func() bool { + return slices.Equal(watcher.PartitionRing().PendingPartitionIDs(), []int32{0}) + }, time.Second, 10*time.Millisecond) + }) +} + +func TestIngester_ShouldNotCreatePartitionIfThereIsShutdownMarker(t *testing.T) { + ctx := context.Background() + + overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + cfg := defaultIngesterTestConfig(t) + ingester, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, prometheus.NewPedanticRegistry()) + + // Create the shutdown marker. + require.NoError(t, os.MkdirAll(cfg.BlocksStorageConfig.TSDB.Dir, os.ModePerm)) + require.NoError(t, shutdownmarker.Create(shutdownmarker.GetPath(cfg.BlocksStorageConfig.TSDB.Dir))) + + // Start ingester. + require.NoError(t, err) + require.NoError(t, ingester.StartAsync(ctx)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(ctx, ingester) + }) + + // Start a watcher used to assert on the partitions ring. + watcher := ring.NewPartitionRingWatcher(PartitionRingName, PartitionRingKey, cfg.IngesterPartitionRing.kvMock, log.NewNopLogger(), nil) + require.NoError(t, services.StartAndAwaitRunning(ctx, watcher)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, watcher)) + }) + + // No matter how long we wait, we expect the ingester service to hung in the starting state + // given it's not allowed to create the partition and the partition doesn't exist in the ring. + time.Sleep(10 * cfg.IngesterPartitionRing.lifecyclerPollingInterval) + + assert.Equal(t, services.Starting, ingester.State()) + assert.Empty(t, watcher.PartitionRing().PartitionIDs()) + assert.Empty(t, watcher.PartitionRing().PartitionOwnerIDs(ingester.ingestPartitionID)) +} + func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, overrides *validation.Overrides, reg prometheus.Registerer) (*Ingester, *kfake.Cluster) { var ( dataDir = t.TempDir() @@ -145,6 +362,14 @@ func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, over ingesterCfg.IngestStorageConfig.KafkaConfig.Topic = "mimir" ingesterCfg.IngestStorageConfig.KafkaConfig.LastProducedOffsetPollInterval = 100 * time.Millisecond + // Create the partition ring store. + kv, closer := consul.NewInMemoryClient(ring.GetPartitionRingCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + ingesterCfg.IngesterPartitionRing.kvMock = kv + ingesterCfg.IngesterPartitionRing.MinOwnersDuration = 0 + ingesterCfg.IngesterPartitionRing.lifecyclerPollingInterval = 10 * time.Millisecond + // Create a fake Kafka cluster. kafkaCluster, kafkaAddr := testkafka.CreateCluster(t, 10, ingesterCfg.IngestStorageConfig.KafkaConfig.Topic) ingesterCfg.IngestStorageConfig.KafkaConfig.Address = kafkaAddr diff --git a/pkg/ingester/ingester_partition_ring.go b/pkg/ingester/ingester_partition_ring.go new file mode 100644 index 00000000000..9b17b4f86e1 --- /dev/null +++ b/pkg/ingester/ingester_partition_ring.go @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package ingester + +import ( + "flag" + "time" + + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" +) + +type PartitionRingConfig struct { + // MinOwnersCount maps to ring.PartitionInstanceLifecyclerConfig's WaitOwnersCountOnPending. + MinOwnersCount int `yaml:"min_partition_owners_count"` + + // MinOwnersDuration maps to ring.PartitionInstanceLifecyclerConfig's WaitOwnersDurationOnPending. + MinOwnersDuration time.Duration `yaml:"min_partition_owners_duration"` + + // DeleteInactivePartitionAfter maps to ring.PartitionInstanceLifecyclerConfig's DeleteInactivePartitionAfterDuration. + DeleteInactivePartitionAfter time.Duration `yaml:"delete_inactive_partition_after"` + + // kvMock is a kv.Client mock used for testing. + kvMock kv.Client `yaml:"-"` + + // lifecyclerPollingInterval is the lifecycler polling interval. This setting is used to lower it in tests. + lifecyclerPollingInterval time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *PartitionRingConfig) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.MinOwnersCount, "ingester.partition-ring.min-partition-owners-count", 1, "Minimum number of owners to wait before a PENDING partition gets switched to ACTIVE.") + f.DurationVar(&cfg.MinOwnersDuration, "ingester.partition-ring.min-partition-owners-duration", 10*time.Second, "How long the minimum number of owners should have been enforced before a PENDING partition gets switched to ACTIVE.") + f.DurationVar(&cfg.DeleteInactivePartitionAfter, "ingester.partition-ring.delete-inactive-partition-after", 13*time.Hour, "How long to wait before an INACTIVE partition is eligible for deletion. The partition will be deleted only if it has been in INACTIVE state for at least the configured duration and it has no owners registered. A value of 0 disables partitions deletion.") +} + +func (cfg *PartitionRingConfig) ToLifecyclerConfig(partitionID int32, instanceID string) ring.PartitionInstanceLifecyclerConfig { + return ring.PartitionInstanceLifecyclerConfig{ + PartitionID: partitionID, + InstanceID: instanceID, + WaitOwnersCountOnPending: cfg.MinOwnersCount, + WaitOwnersDurationOnPending: cfg.MinOwnersDuration, + DeleteInactivePartitionAfterDuration: cfg.DeleteInactivePartitionAfter, + PollingInterval: cfg.lifecyclerPollingInterval, + } +} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index fb055a4fbe2..ed8fab65632 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5129,7 +5129,7 @@ func TestIngester_flushing(t *testing.T) { setupIngester func(cfg *Config) action func(t *testing.T, i *Ingester, reg *prometheus.Registry) }{ - "ingesterShutdown": { + "should flush blocks on shutdown when enabled through the configuration": { setupIngester: func(cfg *Config) { cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = true cfg.BlocksStorageConfig.TSDB.KeepUserTSDBOpenOnShutdown = true @@ -5164,7 +5164,7 @@ func TestIngester_flushing(t *testing.T) { }, }, - "prepareShutdownHandler": { + "should flush blocks on shutdown when enabled through the prepare shutdown API endpoint": { setupIngester: func(cfg *Config) { cfg.IngesterRing.UnregisterOnShutdown = false cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false @@ -5238,7 +5238,7 @@ func TestIngester_flushing(t *testing.T) { }, }, - "shutdownHandler": { + "should flush blocks when the shutdown API endpoint is called": { setupIngester: func(cfg *Config) { cfg.IngesterRing.UnregisterOnShutdown = false cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false @@ -5272,7 +5272,7 @@ func TestIngester_flushing(t *testing.T) { }, }, - "flushHandler": { + "should flush blocks for all tenants when the flush API endpoint is called without tenants list": { setupIngester: func(cfg *Config) { cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false }, @@ -5305,7 +5305,7 @@ func TestIngester_flushing(t *testing.T) { }, }, - "flushHandlerWithListOfTenants": { + "should flush blocks for requested tenants when the flush API endpoint is called with tenants list": { setupIngester: func(cfg *Config) { cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false }, @@ -5361,7 +5361,7 @@ func TestIngester_flushing(t *testing.T) { }, }, - "flushMultipleBlocksWithDataSpanning3Days": { + "should flush blocks spanning multiple days with flush API endpoint is called": { setupIngester: func(cfg *Config) { cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false }, diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 3f7d121d507..28d323a3a67 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -685,35 +685,36 @@ type Mimir struct { ServiceMap map[string]services.Service ModuleManager *modules.Manager - API *api.API - Server *server.Server - IngesterRing *ring.Ring - TenantLimits validation.TenantLimits - Overrides *validation.Overrides - ActiveGroupsCleanup *util.ActiveGroupsCleanupService - Distributor *distributor.Distributor - Ingester *ingester.Ingester - Flusher *flusher.Flusher - FrontendV1 *frontendv1.Frontend - RuntimeConfig *runtimeconfig.Manager - QuerierQueryable prom_storage.SampleAndChunkQueryable - ExemplarQueryable prom_storage.ExemplarQueryable - MetadataSupplier querier.MetadataSupplier - QuerierEngine *promql.Engine - QueryFrontendTripperware querymiddleware.Tripperware - QueryFrontendCodec querymiddleware.Codec - Ruler *ruler.Ruler - RulerDirectStorage rulestore.RuleStore - RulerCachedStorage rulestore.RuleStore - Alertmanager *alertmanager.MultitenantAlertmanager - Compactor *compactor.MultitenantCompactor - StoreGateway *storegateway.StoreGateway - StoreQueryable prom_storage.Queryable - MemberlistKV *memberlist.KVInitService - ActivityTracker *activitytracker.ActivityTracker - Vault *vault.Vault - UsageStatsReporter *usagestats.Reporter - BuildInfoHandler http.Handler + API *api.API + Server *server.Server + IngesterRing *ring.Ring + IngesterPartitionRingWatcher *ring.PartitionRingWatcher + TenantLimits validation.TenantLimits + Overrides *validation.Overrides + ActiveGroupsCleanup *util.ActiveGroupsCleanupService + Distributor *distributor.Distributor + Ingester *ingester.Ingester + Flusher *flusher.Flusher + FrontendV1 *frontendv1.Frontend + RuntimeConfig *runtimeconfig.Manager + QuerierQueryable prom_storage.SampleAndChunkQueryable + ExemplarQueryable prom_storage.ExemplarQueryable + MetadataSupplier querier.MetadataSupplier + QuerierEngine *promql.Engine + QueryFrontendTripperware querymiddleware.Tripperware + QueryFrontendCodec querymiddleware.Codec + Ruler *ruler.Ruler + RulerDirectStorage rulestore.RuleStore + RulerCachedStorage rulestore.RuleStore + Alertmanager *alertmanager.MultitenantAlertmanager + Compactor *compactor.MultitenantCompactor + StoreGateway *storegateway.StoreGateway + StoreQueryable prom_storage.Queryable + MemberlistKV *memberlist.KVInitService + ActivityTracker *activitytracker.ActivityTracker + Vault *vault.Vault + UsageStatsReporter *usagestats.Reporter + BuildInfoHandler http.Handler } // New makes a new Mimir. @@ -836,9 +837,9 @@ func (t *Mimir) Run() error { // implementation provided by module.Ring over the BasicLifecycler // available in ingesters if t.IngesterRing != nil { - t.API.RegisterRing(t.IngesterRing) + t.API.RegisterIngesterRing(t.IngesterRing) } else if t.Ingester != nil { - t.API.RegisterRing(t.Ingester.RingHandler()) + t.API.RegisterIngesterRing(t.Ingester.RingHandler()) } // get all services, create service manager and tell it to start diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index ec1588e72cf..77c76cd7e39 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/dns" httpgrpc_server "github.com/grafana/dskit/httpgrpc/server" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/ring" @@ -70,6 +71,7 @@ const ( API string = "api" SanityCheck string = "sanity-check" IngesterRing string = "ingester-ring" + IngesterPartitionRing string = "ingester-partitions-ring" RuntimeConfig string = "runtime-config" Overrides string = "overrides" OverridesExporter string = "overrides-exporter" @@ -355,6 +357,24 @@ func (t *Mimir) initIngesterRing() (serv services.Service, err error) { return t.IngesterRing, nil } +func (t *Mimir) initIngesterPartitionRing() (services.Service, error) { + if !t.Cfg.IngestStorage.Enabled { + return nil, nil + } + + kvClient, err := kv.NewClient(t.Cfg.Ingester.IngesterRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(t.Registerer, ingester.PartitionRingName+"-watcher"), util_log.Logger) + if err != nil { + return nil, errors.Wrap(err, "creating KV store for ingester partitions ring watcher") + } + + t.IngesterPartitionRingWatcher = ring.NewPartitionRingWatcher(ingester.PartitionRingName, ingester.PartitionRingKey, kvClient, util_log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", t.Registerer)) + + // Expose a web page to view the partitions ring state. + t.API.RegisterIngesterPartitionRing(ring.NewPartitionRingPageHandler(t.IngesterPartitionRingWatcher)) + + return t.IngesterPartitionRingWatcher, nil +} + func (t *Mimir) initRuntimeConfig() (services.Service, error) { if len(t.Cfg.RuntimeConfig.LoadPath) == 0 { // no need to initialize module if load path is empty @@ -957,6 +977,9 @@ func (t *Mimir) initStoreGateway() (serv services.Service, err error) { func (t *Mimir) initMemberlistKV() (services.Service, error) { // Append to the list of codecs instead of overwriting the value to allow third parties to inject their own codecs. t.Cfg.MemberlistKV.Codecs = append(t.Cfg.MemberlistKV.Codecs, ring.GetCodec()) + if t.Cfg.IngestStorage.Enabled { + t.Cfg.MemberlistKV.Codecs = append(t.Cfg.MemberlistKV.Codecs, ring.GetPartitionRingCodec()) + } dnsProviderReg := prometheus.WrapRegistererWithPrefix( "cortex_", @@ -1030,6 +1053,7 @@ func (t *Mimir) setupModuleManager() error { mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule) mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule) mm.RegisterModule(IngesterRing, t.initIngesterRing, modules.UserInvisibleModule) + mm.RegisterModule(IngesterPartitionRing, t.initIngesterPartitionRing, modules.UserInvisibleModule) mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(ActiveGroupsCleanupService, t.initActiveGroupsCleanupService, modules.UserInvisibleModule) @@ -1064,14 +1088,15 @@ func (t *Mimir) setupModuleManager() error { MemberlistKV: {API, Vault}, RuntimeConfig: {API}, IngesterRing: {API, RuntimeConfig, MemberlistKV, Vault}, + IngesterPartitionRing: {MemberlistKV, API}, Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, MemberlistKV, Vault}, Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, - DistributorService: {IngesterRing, Overrides, Vault}, + DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, - IngesterService: {IngesterRing, Overrides, RuntimeConfig, MemberlistKV}, + IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, Flusher: {Overrides, API}, - Queryable: {Overrides, DistributorService, IngesterRing, API, StoreQueryable, MemberlistKV}, + Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, Querier: {TenantFederation, Vault}, StoreQueryable: {Overrides, MemberlistKV}, QueryFrontendTripperware: {API, Overrides}, diff --git a/pkg/storage/ingest/util.go b/pkg/storage/ingest/util.go index d571d011d8a..ccd05980cec 100644 --- a/pkg/storage/ingest/util.go +++ b/pkg/storage/ingest/util.go @@ -31,11 +31,13 @@ var ( } ) -// IngesterPartition returns the partition ID to use to write to a specific ingester partition. +// IngesterZonalPartition returns the partition ID to use to write to a specific ingester partition. // The input ingester ID is expected to end either with "zone-X-Y" or only "-Y" where "X" is a letter in the range [a,d] // and "Y" is a positive integer number. This means that this function supports up to 4 zones starting // with letter "a" or no zones at all. -func IngesterPartition(ingesterID string) (int32, error) { +// +// Deprecated: the experimental ingest storage is moving to partition IDs which are not based on the ingester zone. +func IngesterZonalPartition(ingesterID string) (int32, error) { match := ingesterIDRegexp.FindStringSubmatch(ingesterID) if len(match) == 0 { return 0, fmt.Errorf("name doesn't match regular expression %s %q", ingesterID, ingesterIDRegexp.String()) @@ -64,6 +66,22 @@ func IngesterPartition(ingesterID string) (int32, error) { return partitionID, nil } +// IngesterPartitionID returns the partition ID owner the the given ingester. +func IngesterPartitionID(ingesterID string) (int32, error) { + match := ingesterIDRegexp.FindStringSubmatch(ingesterID) + if len(match) == 0 { + return 0, fmt.Errorf("ingester ID %s doesn't match regular expression %q", ingesterID, ingesterIDRegexp.String()) + } + + // Parse the ingester sequence number. + ingesterSeq, err := strconv.Atoi(match[2]) + if err != nil { + return 0, fmt.Errorf("no ingester sequence number in ingester ID %s", ingesterID) + } + + return int32(ingesterSeq), nil +} + func commonKafkaClientOptions(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Logger) []kgo.Opt { return []kgo.Opt{ kgo.ClientID(cfg.ClientID), diff --git a/pkg/storage/ingest/util_test.go b/pkg/storage/ingest/util_test.go index 2acee7b66df..17bd84407ba 100644 --- a/pkg/storage/ingest/util_test.go +++ b/pkg/storage/ingest/util_test.go @@ -13,86 +13,135 @@ import ( "github.com/stretchr/testify/require" ) -func TestIngesterPartition(t *testing.T) { +func TestIngesterZonalPartition(t *testing.T) { t.Run("with zones", func(t *testing.T) { - actual, err := IngesterPartition("ingester-zone-a-0") + actual, err := IngesterZonalPartition("ingester-zone-a-0") require.NoError(t, err) assert.EqualValues(t, 0, actual) - actual, err = IngesterPartition("ingester-zone-b-0") + actual, err = IngesterZonalPartition("ingester-zone-b-0") require.NoError(t, err) assert.EqualValues(t, 1, actual) - actual, err = IngesterPartition("ingester-zone-c-0") + actual, err = IngesterZonalPartition("ingester-zone-c-0") require.NoError(t, err) assert.EqualValues(t, 2, actual) - actual, err = IngesterPartition("ingester-zone-a-1") + actual, err = IngesterZonalPartition("ingester-zone-a-1") require.NoError(t, err) assert.EqualValues(t, 4, actual) - actual, err = IngesterPartition("ingester-zone-b-1") + actual, err = IngesterZonalPartition("ingester-zone-b-1") require.NoError(t, err) assert.EqualValues(t, 5, actual) - actual, err = IngesterPartition("ingester-zone-c-1") + actual, err = IngesterZonalPartition("ingester-zone-c-1") require.NoError(t, err) assert.EqualValues(t, 6, actual) - actual, err = IngesterPartition("ingester-zone-a-2") + actual, err = IngesterZonalPartition("ingester-zone-a-2") require.NoError(t, err) assert.EqualValues(t, 8, actual) - actual, err = IngesterPartition("ingester-zone-b-2") + actual, err = IngesterZonalPartition("ingester-zone-b-2") require.NoError(t, err) assert.EqualValues(t, 9, actual) - actual, err = IngesterPartition("ingester-zone-c-2") + actual, err = IngesterZonalPartition("ingester-zone-c-2") require.NoError(t, err) assert.EqualValues(t, 10, actual) - actual, err = IngesterPartition("mimir-write-zone-a-1") + actual, err = IngesterZonalPartition("mimir-write-zone-a-1") require.NoError(t, err) assert.EqualValues(t, 4, actual) - actual, err = IngesterPartition("mimir-write-zone-b-1") + actual, err = IngesterZonalPartition("mimir-write-zone-b-1") require.NoError(t, err) assert.EqualValues(t, 5, actual) - actual, err = IngesterPartition("mimir-write-zone-c-1") + actual, err = IngesterZonalPartition("mimir-write-zone-c-1") require.NoError(t, err) assert.EqualValues(t, 6, actual) }) t.Run("without zones", func(t *testing.T) { - actual, err := IngesterPartition("ingester-0") + actual, err := IngesterZonalPartition("ingester-0") require.NoError(t, err) assert.EqualValues(t, 0, actual) - actual, err = IngesterPartition("ingester-1") + actual, err = IngesterZonalPartition("ingester-1") require.NoError(t, err) assert.EqualValues(t, 4, actual) - actual, err = IngesterPartition("mimir-write-0") + actual, err = IngesterZonalPartition("mimir-write-0") require.NoError(t, err) assert.EqualValues(t, 0, actual) - actual, err = IngesterPartition("mimir-write-1") + actual, err = IngesterZonalPartition("mimir-write-1") require.NoError(t, err) assert.EqualValues(t, 4, actual) }) t.Run("should return error if the ingester ID has a non supported format", func(t *testing.T) { - _, err := IngesterPartition("unknown") + _, err := IngesterZonalPartition("unknown") require.Error(t, err) - _, err = IngesterPartition("ingester-zone-X-0") + _, err = IngesterZonalPartition("ingester-zone-X-0") require.Error(t, err) - _, err = IngesterPartition("ingester-zone-a-") + _, err = IngesterZonalPartition("ingester-zone-a-") require.Error(t, err) - _, err = IngesterPartition("ingester-zone-a") + _, err = IngesterZonalPartition("ingester-zone-a") + require.Error(t, err) + }) +} + +func TestIngesterPartitionID(t *testing.T) { + t.Run("with zones", func(t *testing.T) { + actual, err := IngesterPartitionID("ingester-zone-a-0") + require.NoError(t, err) + assert.EqualValues(t, 0, actual) + + actual, err = IngesterPartitionID("ingester-zone-b-0") + require.NoError(t, err) + assert.EqualValues(t, 0, actual) + + actual, err = IngesterPartitionID("ingester-zone-a-1") + require.NoError(t, err) + assert.EqualValues(t, 1, actual) + + actual, err = IngesterPartitionID("ingester-zone-b-1") + require.NoError(t, err) + assert.EqualValues(t, 1, actual) + + actual, err = IngesterPartitionID("mimir-write-zone-c-2") + require.NoError(t, err) + assert.EqualValues(t, 2, actual) + }) + + t.Run("without zones", func(t *testing.T) { + actual, err := IngesterPartitionID("ingester-0") + require.NoError(t, err) + assert.EqualValues(t, 0, actual) + + actual, err = IngesterPartitionID("ingester-1") + require.NoError(t, err) + assert.EqualValues(t, 1, actual) + + actual, err = IngesterPartitionID("mimir-write-2") + require.NoError(t, err) + assert.EqualValues(t, 2, actual) + }) + + t.Run("should return error if the ingester ID has a non supported format", func(t *testing.T) { + _, err := IngesterPartitionID("unknown") + require.Error(t, err) + + _, err = IngesterPartitionID("ingester-zone-a-") + require.Error(t, err) + + _, err = IngesterPartitionID("ingester-zone-a") require.Error(t, err) }) } diff --git a/vendor/github.com/grafana/dskit/ring/partition_instance_lifecycler.go b/vendor/github.com/grafana/dskit/ring/partition_instance_lifecycler.go new file mode 100644 index 00000000000..ac8a65d066f --- /dev/null +++ b/vendor/github.com/grafana/dskit/ring/partition_instance_lifecycler.go @@ -0,0 +1,418 @@ +package ring + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" + + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/services" +) + +var ( + ErrPartitionDoesNotExist = errors.New("the partition does not exist") + ErrPartitionStateMismatch = errors.New("the partition state does not match the expected one") + ErrPartitionStateChangeNotAllowed = errors.New("partition state change not allowed") + + allowedPartitionStateChanges = map[PartitionState][]PartitionState{ + PartitionPending: {PartitionActive, PartitionInactive}, + PartitionActive: {PartitionInactive}, + PartitionInactive: {PartitionPending, PartitionActive}, + } +) + +type PartitionInstanceLifecyclerConfig struct { + // PartitionID is the ID of the partition managed by the lifecycler. + PartitionID int32 + + // InstanceID is the ID of the instance managed by the lifecycler. + InstanceID string + + // WaitOwnersCountOnPending is the minimum number of owners to wait before switching a + // PENDING partition to ACTIVE. + WaitOwnersCountOnPending int + + // WaitOwnersDurationOnPending is how long each owner should have been added to the + // partition before it's considered eligible for the WaitOwnersCountOnPending count. + WaitOwnersDurationOnPending time.Duration + + // DeleteInactivePartitionAfterDuration is how long the lifecycler should wait before + // deleting inactive partitions with no owners. Inactive partitions are never removed + // if this value is 0. + DeleteInactivePartitionAfterDuration time.Duration + + // PollingInterval is the internal polling interval. This setting is useful to let + // upstream projects to lower it in unit tests. + PollingInterval time.Duration +} + +// PartitionInstanceLifecycler is responsible to manage the lifecycle of a single +// partition and partition owner in the ring. +type PartitionInstanceLifecycler struct { + *services.BasicService + + // These values are initialised at startup, and never change. + cfg PartitionInstanceLifecyclerConfig + ringName string + ringKey string + store kv.Client + logger log.Logger + + // Channel used to execute logic within the lifecycler loop. + actorChan chan func() + + // Whether the partitions should be created on startup if it doesn't exist yet. + createPartitionOnStartup *atomic.Bool + + // Whether the lifecycler should remove the partition owner (identified by instance ID) on shutdown. + removeOwnerOnShutdown *atomic.Bool + + // Metrics. + reconcilesTotal *prometheus.CounterVec + reconcilesFailedTotal *prometheus.CounterVec +} + +func NewPartitionInstanceLifecycler(cfg PartitionInstanceLifecyclerConfig, ringName, ringKey string, store kv.Client, logger log.Logger, reg prometheus.Registerer) *PartitionInstanceLifecycler { + if cfg.PollingInterval == 0 { + cfg.PollingInterval = 5 * time.Second + } + + l := &PartitionInstanceLifecycler{ + cfg: cfg, + ringName: ringName, + ringKey: ringKey, + store: store, + logger: log.With(logger, "ring", ringName), + actorChan: make(chan func()), + createPartitionOnStartup: atomic.NewBool(true), + removeOwnerOnShutdown: atomic.NewBool(false), + reconcilesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "partition_ring_lifecycler_reconciles_total", + Help: "Total number of reconciliations started.", + ConstLabels: map[string]string{"name": ringName}, + }, []string{"type"}), + reconcilesFailedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "partition_ring_lifecycler_reconciles_failed_total", + Help: "Total number of reconciliations failed.", + ConstLabels: map[string]string{"name": ringName}, + }, []string{"type"}), + } + + l.BasicService = services.NewBasicService(l.starting, l.running, l.stopping) + + return l +} + +// CreatePartitionOnStartup returns whether the lifecycle creates the partition on startup +// if it doesn't exist. +func (l *PartitionInstanceLifecycler) CreatePartitionOnStartup() bool { + return l.createPartitionOnStartup.Load() +} + +// SetCreatePartitionOnStartup sets whether the lifecycler should create the partition on +// startup if it doesn't exist. +func (l *PartitionInstanceLifecycler) SetCreatePartitionOnStartup(create bool) { + l.createPartitionOnStartup.Store(create) +} + +// RemoveOwnerOnShutdown returns whether the lifecycler has been configured to remove the partition +// owner on shutdown. +func (l *PartitionInstanceLifecycler) RemoveOwnerOnShutdown() bool { + return l.removeOwnerOnShutdown.Load() +} + +// SetRemoveOwnerOnShutdown sets whether the lifecycler should remove the partition owner on shutdown. +func (l *PartitionInstanceLifecycler) SetRemoveOwnerOnShutdown(remove bool) { + l.removeOwnerOnShutdown.Store(remove) +} + +// GetPartitionState returns the current state of the partition, and the timestamp when the state was +// changed the last time. +func (l *PartitionInstanceLifecycler) GetPartitionState(ctx context.Context) (PartitionState, time.Time, error) { + ring, err := l.getRing(ctx) + if err != nil { + return PartitionUnknown, time.Time{}, err + } + + partition, exists := ring.Partitions[l.cfg.PartitionID] + if !exists { + return PartitionUnknown, time.Time{}, ErrPartitionDoesNotExist + } + + return partition.GetState(), partition.GetStateTime(), nil +} + +// ChangePartitionState changes the partition state to toState. +// This function returns ErrPartitionDoesNotExist if the partition doesn't exist, +// and ErrPartitionStateChangeNotAllowed if the state change is not allowed. +func (l *PartitionInstanceLifecycler) ChangePartitionState(ctx context.Context, toState PartitionState) error { + return l.runOnLifecyclerLoop(func() error { + err := l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + partition, exists := ring.Partitions[l.cfg.PartitionID] + if !exists { + return false, ErrPartitionDoesNotExist + } + + if partition.State == toState { + return false, nil + } + + if !isPartitionStateChangeAllowed(partition.State, toState) { + return false, errors.Wrapf(ErrPartitionStateChangeNotAllowed, "change partition state from %s to %s", partition.State.CleanName(), toState.CleanName()) + } + + return ring.UpdatePartitionState(l.cfg.PartitionID, toState, time.Now()), nil + }) + + if err != nil { + level.Warn(l.logger).Log("msg", "failed to change partition state", "partition", l.cfg.PartitionID, "to_state", toState, "err", err) + } + + return err + }) +} + +func (l *PartitionInstanceLifecycler) starting(ctx context.Context) error { + if l.CreatePartitionOnStartup() { + return errors.Wrap(l.createPartitionAndRegisterOwner(ctx), "create partition and register owner") + } + + return errors.Wrap(l.waitPartitionAndRegisterOwner(ctx), "wait partition and register owner") +} + +func (l *PartitionInstanceLifecycler) running(ctx context.Context) error { + reconcileTicker := time.NewTicker(l.cfg.PollingInterval) + defer reconcileTicker.Stop() + + for { + select { + case <-reconcileTicker.C: + l.reconcileOwnedPartition(ctx, time.Now()) + l.reconcileOtherPartitions(ctx, time.Now()) + + case f := <-l.actorChan: + f() + + case <-ctx.Done(): + return nil + } + } +} + +func (l *PartitionInstanceLifecycler) stopping(_ error) error { + level.Info(l.logger).Log("msg", "partition ring lifecycler is shutting down", "ring", l.ringName) + + // Remove the instance from partition owners, if configured to do so. + if l.RemoveOwnerOnShutdown() { + err := l.updateRing(context.Background(), func(ring *PartitionRingDesc) (bool, error) { + return ring.RemoveOwner(l.cfg.InstanceID), nil + }) + + if err != nil { + level.Error(l.logger).Log("msg", "failed to remove instance from partition owners on shutdown", "instance", l.cfg.InstanceID, "partition", l.cfg.PartitionID, "err", err) + } else { + level.Info(l.logger).Log("msg", "instance removed from partition owners", "instance", l.cfg.InstanceID, "partition", l.cfg.PartitionID) + } + } + + return nil +} + +// runOnLifecyclerLoop runs fn within the lifecycler loop. +func (l *PartitionInstanceLifecycler) runOnLifecyclerLoop(fn func() error) error { + sc := l.ServiceContext() + if sc == nil { + return errors.New("lifecycler not running") + } + + errCh := make(chan error) + wrappedFn := func() { + errCh <- fn() + } + + select { + case <-sc.Done(): + return errors.New("lifecycler not running") + case l.actorChan <- wrappedFn: + return <-errCh + } +} + +func (l *PartitionInstanceLifecycler) getRing(ctx context.Context) (*PartitionRingDesc, error) { + in, err := l.store.Get(ctx, l.ringKey) + if err != nil { + return nil, err + } + + return GetOrCreatePartitionRingDesc(in), nil +} + +func (l *PartitionInstanceLifecycler) updateRing(ctx context.Context, update func(ring *PartitionRingDesc) (bool, error)) error { + return l.store.CAS(ctx, l.ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + ringDesc := GetOrCreatePartitionRingDesc(in) + + if changed, err := update(ringDesc); err != nil { + return nil, false, err + } else if !changed { + return nil, false, nil + } + + return ringDesc, true, nil + }) +} + +func (l *PartitionInstanceLifecycler) createPartitionAndRegisterOwner(ctx context.Context) error { + return l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + now := time.Now() + changed := false + + partitionDesc, exists := ring.Partitions[l.cfg.PartitionID] + if exists { + level.Info(l.logger).Log("msg", "partition found in the ring", "partition", l.cfg.PartitionID, "state", partitionDesc.GetState(), "state_timestamp", partitionDesc.GetState().String(), "tokens", len(partitionDesc.GetTokens())) + } else { + level.Info(l.logger).Log("msg", "partition not found in the ring", "partition", l.cfg.PartitionID) + } + + if !exists { + // The partition doesn't exist, so we create a new one. A new partition should always be created + // in PENDING state. + ring.AddPartition(l.cfg.PartitionID, PartitionPending, now) + changed = true + } + + // Ensure the instance is added as partition owner. + if ring.AddOrUpdateOwner(l.cfg.InstanceID, OwnerActive, l.cfg.PartitionID, now) { + changed = true + } + + return changed, nil + }) +} + +func (l *PartitionInstanceLifecycler) waitPartitionAndRegisterOwner(ctx context.Context) error { + pollTicker := time.NewTicker(l.cfg.PollingInterval) + defer pollTicker.Stop() + + // Wait until the partition exists. + checkPartitionExist := func() (bool, error) { + level.Info(l.logger).Log("msg", "checking if the partition exist in the ring", "partition", l.cfg.PartitionID) + + ring, err := l.getRing(ctx) + if err != nil { + return false, errors.Wrap(err, "read partition ring") + } + + if ring.HasPartition(l.cfg.PartitionID) { + level.Info(l.logger).Log("msg", "partition found in the ring", "partition", l.cfg.PartitionID) + return true, nil + } + + level.Info(l.logger).Log("msg", "partition not found in the ring", "partition", l.cfg.PartitionID) + return false, nil + } + + for { + if exists, err := checkPartitionExist(); err != nil { + return err + } else if exists { + break + } + + select { + case <-ctx.Done(): + return ctx.Err() + + case <-pollTicker.C: + // Throttle. + } + } + + // Ensure the instance is added as partition owner. + return l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + return ring.AddOrUpdateOwner(l.cfg.InstanceID, OwnerActive, l.cfg.PartitionID, time.Now()), nil + }) +} + +// reconcileOwnedPartition reconciles the owned partition. +// This function should be called periodically. +func (l *PartitionInstanceLifecycler) reconcileOwnedPartition(ctx context.Context, now time.Time) { + const reconcileType = "owned-partition" + l.reconcilesTotal.WithLabelValues(reconcileType).Inc() + + err := l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + partitionID := l.cfg.PartitionID + + partition, exists := ring.Partitions[partitionID] + if !exists { + return false, ErrPartitionDoesNotExist + } + + // A pending partition should be switched to active if there are enough owners that + // have been added since more than the waiting period. + if partition.IsPending() && ring.PartitionOwnersCountUpdatedBefore(partitionID, now.Add(-l.cfg.WaitOwnersDurationOnPending)) >= l.cfg.WaitOwnersCountOnPending { + level.Info(l.logger).Log("msg", "switching partition state because enough owners have been registered and minimum waiting time has elapsed", "partition", l.cfg.PartitionID, "from_state", PartitionPending, "to_state", PartitionActive) + return ring.UpdatePartitionState(partitionID, PartitionActive, now), nil + } + + return false, nil + }) + + if err != nil { + l.reconcilesFailedTotal.WithLabelValues(reconcileType).Inc() + level.Warn(l.logger).Log("msg", "failed to reconcile owned partition", "partition", l.cfg.PartitionID, "err", err) + } +} + +// reconcileOtherPartitions reconciles other partitions. +// This function should be called periodically. +func (l *PartitionInstanceLifecycler) reconcileOtherPartitions(ctx context.Context, now time.Time) { + const reconcileType = "other-partitions" + l.reconcilesTotal.WithLabelValues(reconcileType).Inc() + + err := l.updateRing(ctx, func(ring *PartitionRingDesc) (bool, error) { + changed := false + + if l.cfg.DeleteInactivePartitionAfterDuration > 0 { + deleteBefore := now.Add(-l.cfg.DeleteInactivePartitionAfterDuration) + + for partitionID, partition := range ring.Partitions { + // Never delete the partition owned by this lifecycler, since it's expected to have at least + // this instance as owner. + if partitionID == l.cfg.PartitionID { + continue + } + + // A partition is safe to be removed only if it's inactive since longer than the wait period + // and it has no owners registered. + if partition.IsInactiveSince(deleteBefore) && ring.PartitionOwnersCount(partitionID) == 0 { + level.Info(l.logger).Log("msg", "removing inactive partition with no owners from ring", "partition", partitionID, "state", partition.State.CleanName(), "state_timestamp", partition.GetStateTime().String()) + ring.RemovePartition(partitionID) + changed = true + } + } + } + + return changed, nil + }) + + if err != nil { + l.reconcilesFailedTotal.WithLabelValues(reconcileType).Inc() + level.Warn(l.logger).Log("msg", "failed to reconcile other partitions", "err", err) + } +} + +func isPartitionStateChangeAllowed(from, to PartitionState) bool { + for _, allowed := range allowedPartitionStateChanges[from] { + if to == allowed { + return true + } + } + + return false +} diff --git a/vendor/github.com/grafana/dskit/ring/partition_ring_model.go b/vendor/github.com/grafana/dskit/ring/partition_ring_model.go index e397d1915c6..59c8aab03c5 100644 --- a/vendor/github.com/grafana/dskit/ring/partition_ring_model.go +++ b/vendor/github.com/grafana/dskit/ring/partition_ring_model.go @@ -27,7 +27,12 @@ func GetOrCreatePartitionRingDesc(in any) *PartitionRingDesc { return NewPartitionRingDesc() } - return in.(*PartitionRingDesc) + desc := in.(*PartitionRingDesc) + if desc == nil { + return NewPartitionRingDesc() + } + + return desc } func NewPartitionRingDesc() *PartitionRingDesc { @@ -69,6 +74,12 @@ func (m *PartitionRingDesc) ownersByPartition() map[int32][]string { for id, o := range m.Owners { out[o.OwnedPartition] = append(out[o.OwnedPartition], id) } + + // Sort owners to have predictable tests. + for id := range out { + slices.Sort(out[id]) + } + return out } @@ -176,13 +187,24 @@ func (m *PartitionRingDesc) AddOrUpdateOwner(id string, state OwnerState, ownedP } updated.UpdatedTimestamp = now.Unix() - m.Owners[id] = updated + + if m.Owners == nil { + m.Owners = map[string]OwnerDesc{id: updated} + } else { + m.Owners[id] = updated + } + return true } -// RemoveOwner removes a partition owner. -func (m *PartitionRingDesc) RemoveOwner(id string) { +// RemoveOwner removes a partition owner. Returns true if the ring has been changed. +func (m *PartitionRingDesc) RemoveOwner(id string) bool { + if _, ok := m.Owners[id]; !ok { + return false + } + delete(m.Owners, id) + return true } // HasOwner returns whether a owner exists. @@ -191,6 +213,31 @@ func (m *PartitionRingDesc) HasOwner(id string) bool { return ok } +// PartitionOwnersCount returns the number of owners for a given partition. +func (m *PartitionRingDesc) PartitionOwnersCount(partitionID int32) int { + count := 0 + for _, o := range m.Owners { + if o.OwnedPartition == partitionID { + count++ + } + } + return count +} + +// PartitionOwnersCountUpdatedBefore returns the number of owners for a given partition, +// including only owners which have been updated the last time before the input timestamp. +func (m *PartitionRingDesc) PartitionOwnersCountUpdatedBefore(partitionID int32, before time.Time) int { + count := 0 + beforeSeconds := before.Unix() + + for _, o := range m.Owners { + if o.OwnedPartition == partitionID && o.GetUpdatedTimestamp() < beforeSeconds { + count++ + } + } + return count +} + // Merge implements memberlist.Mergeable. func (m *PartitionRingDesc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error) { return m.mergeWithTime(mergeable, localCAS, time.Now()) @@ -363,6 +410,14 @@ func (m *PartitionDesc) IsInactive() bool { return m.GetState() == PartitionInactive } +func (m *PartitionDesc) IsInactiveSince(since time.Time) bool { + return m.IsInactive() && m.GetStateTimestamp() < since.Unix() +} + +func (m *PartitionDesc) GetStateTime() time.Time { + return time.Unix(m.GetStateTimestamp(), 0) +} + func (m *PartitionDesc) Clone() PartitionDesc { return *(proto.Clone(m).(*PartitionDesc)) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 09d6f77e3da..8dc49251731 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -538,7 +538,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20240208074945-f245b483eb15 +# github.com/grafana/dskit v0.0.0-20240213103939-80881aa4a62f ## explicit; go 1.20 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast