From cfd41910baee8ff2d7c06010fec45d580dcb9e62 Mon Sep 17 00:00:00 2001
From: TheR1sing3un <87409330+TheR1sing3un@users.noreply.github.com>
Date: Tue, 25 Jul 2023 13:54:30 +0800
Subject: [PATCH] [ISSUE #307] Support metrics (#308)
* feat(core): add metrics manager
1. add metrics manager
* feat(core): record metrics
1. record metrics
* fix(core): avoid NPE
1. avoid NPE
* style(core): fix checkstyle
1. fix checkstyle
* feat(core): optimize metrics name
1. optimize metrics name
* rerun
* rerun
---
dledger/pom.xml | 20 +
.../storage/dledger/DLedgerConfig.java | 76 ++++
.../storage/dledger/DLedgerEntryPusher.java | 15 +
.../storage/dledger/DLedgerServer.java | 35 +-
.../metrics/DLedgerMetricsConstant.java | 66 +++
.../metrics/DLedgerMetricsManager.java | 394 ++++++++++++++++++
.../dledger/metrics/MetricsExporterType.java | 52 +++
.../dledger/metrics/NopLongCounter.java | 38 ++
.../dledger/metrics/NopLongHistogram.java | 39 ++
.../metrics/NopObservableLongGauge.java | 22 +
.../dledger/snapshot/SnapshotManager.java | 24 +-
.../dledger/snapshot/SnapshotStore.java | 4 +
.../snapshot/file/FileSnapshotStore.java | 92 ++--
.../statemachine/StateMachineCaller.java | 11 +
.../example/register/RegisterDLedger.java | 2 +
.../register/command/RegisterCommand.java | 9 +
pom.xml | 28 ++
script/deploy-three-register.sh | 23 +
18 files changed, 904 insertions(+), 46 deletions(-)
create mode 100644 dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsConstant.java
create mode 100644 dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsManager.java
create mode 100644 dledger/src/main/java/io/openmessaging/storage/dledger/metrics/MetricsExporterType.java
create mode 100644 dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongCounter.java
create mode 100644 dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongHistogram.java
create mode 100644 dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopObservableLongGauge.java
create mode 100755 script/deploy-three-register.sh
diff --git a/dledger/pom.xml b/dledger/pom.xml
index 2c378362..bb519545 100644
--- a/dledger/pom.xml
+++ b/dledger/pom.xml
@@ -39,6 +39,26 @@
org.apache.commons
commons-lang3
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+
+
+ io.opentelemetry
+ opentelemetry-exporter-prometheus
+
+
+ io.opentelemetry
+ opentelemetry-exporter-logging
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+
+
+ org.slf4j
+ jul-to-slf4j
+
org.slf4j
slf4j-simple
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java
index fa1f85b2..e53e11fb 100644
--- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java
@@ -16,6 +16,7 @@
package io.openmessaging.storage.dledger;
+import io.openmessaging.storage.dledger.metrics.MetricsExporterType;
import io.openmessaging.storage.dledger.snapshot.SnapshotEntryResetStrategy;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
@@ -119,6 +120,17 @@ public class DLedgerConfig {
*/
private boolean enableFastAdvanceCommitIndex = false;
+ private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
+
+ private String metricsGrpcExporterTarget = "";
+ private String metricsGrpcExporterHeader = "";
+ private long metricGrpcExporterTimeOutInMills = 3 * 1000;
+ private long metricGrpcExporterIntervalInMills = 60 * 1000;
+ private long metricLoggingExporterIntervalInMills = 10 * 1000;
+
+ private int metricsPromExporterPort = 5557;
+ private String metricsPromExporterHost = "";
+
public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
@@ -559,4 +571,68 @@ public boolean isEnableFastAdvanceCommitIndex() {
public void setEnableFastAdvanceCommitIndex(boolean enableFastAdvanceCommitIndex) {
this.enableFastAdvanceCommitIndex = enableFastAdvanceCommitIndex;
}
+
+ public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
+ this.metricsExporterType = metricsExporterType;
+ }
+
+ public MetricsExporterType getMetricsExporterType() {
+ return metricsExporterType;
+ }
+
+ public void setMetricsGrpcExporterTarget(String metricsGrpcExporterTarget) {
+ this.metricsGrpcExporterTarget = metricsGrpcExporterTarget;
+ }
+
+ public void setMetricsGrpcExporterHeader(String metricsGrpcExporterHeader) {
+ this.metricsGrpcExporterHeader = metricsGrpcExporterHeader;
+ }
+
+ public void setMetricGrpcExporterTimeOutInMills(long metricGrpcExporterTimeOutInMills) {
+ this.metricGrpcExporterTimeOutInMills = metricGrpcExporterTimeOutInMills;
+ }
+
+ public void setMetricGrpcExporterIntervalInMills(long metricGrpcExporterIntervalInMills) {
+ this.metricGrpcExporterIntervalInMills = metricGrpcExporterIntervalInMills;
+ }
+
+ public void setMetricLoggingExporterIntervalInMills(long metricLoggingExporterIntervalInMills) {
+ this.metricLoggingExporterIntervalInMills = metricLoggingExporterIntervalInMills;
+ }
+
+ public void setMetricsPromExporterPort(int metricsPromExporterPort) {
+ this.metricsPromExporterPort = metricsPromExporterPort;
+ }
+
+ public void setMetricsPromExporterHost(String metricsPromExporterHost) {
+ this.metricsPromExporterHost = metricsPromExporterHost;
+ }
+
+ public String getMetricsGrpcExporterTarget() {
+ return metricsGrpcExporterTarget;
+ }
+
+ public String getMetricsGrpcExporterHeader() {
+ return metricsGrpcExporterHeader;
+ }
+
+ public long getMetricGrpcExporterTimeOutInMills() {
+ return metricGrpcExporterTimeOutInMills;
+ }
+
+ public long getMetricGrpcExporterIntervalInMills() {
+ return metricGrpcExporterIntervalInMills;
+ }
+
+ public long getMetricLoggingExporterIntervalInMills() {
+ return metricLoggingExporterIntervalInMills;
+ }
+
+ public int getMetricsPromExporterPort() {
+ return metricsPromExporterPort;
+ }
+
+ public String getMetricsPromExporterHost() {
+ return metricsPromExporterHost;
+ }
}
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java
index a6fb6ed3..37ed5564 100644
--- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java
@@ -24,6 +24,7 @@
import io.openmessaging.storage.dledger.common.WriteClosure;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
+import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
@@ -41,6 +42,7 @@
import io.openmessaging.storage.dledger.utils.PreConditions;
import io.openmessaging.storage.dledger.utils.Quota;
+import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -56,9 +58,12 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.LABEL_REMOTE_ID;
+
public class DLedgerEntryPusher {
private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerEntryPusher.class);
@@ -708,6 +713,9 @@ private void sendBatchAppendEntryRequest() throws Exception {
final long firstIndex = batchAppendEntryRequest.getFirstEntryIndex();
final long lastIndex = batchAppendEntryRequest.getLastEntryIndex();
final long lastTerm = batchAppendEntryRequest.getLastEntryTerm();
+ final long entriesCount = batchAppendEntryRequest.getCount();
+ final long entriesSize = batchAppendEntryRequest.getTotalSize();
+ StopWatch watch = StopWatch.createStarted();
CompletableFuture responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
pendingMap.put(firstIndex, new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
responseFuture.whenComplete((x, ex) -> {
@@ -716,6 +724,10 @@ private void sendBatchAppendEntryRequest() throws Exception {
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
case SUCCESS:
+ Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().put(LABEL_REMOTE_ID, this.peerId).build();
+ DLedgerMetricsManager.replicateEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
+ DLedgerMetricsManager.replicateEntryBatchCount.record(entriesCount, attributes);
+ DLedgerMetricsManager.replicateEntryBatchBytes.record(entriesSize, attributes);
pendingMap.remove(firstIndex);
if (lastIndex > matchIndex) {
matchIndex = lastIndex;
@@ -767,12 +779,15 @@ private void doInstallSnapshot() throws Exception {
long lastIncludedIndex = snapshot.getMeta().getLastIncludedIndex();
long lastIncludedTerm = snapshot.getMeta().getLastIncludedTerm();
InstallSnapshotRequest request = buildInstallSnapshotRequest(snapshot);
+ StopWatch watch = StopWatch.createStarted();
CompletableFuture future = DLedgerEntryPusher.this.dLedgerRpcService.installSnapshot(request);
InstallSnapshotResponse response = future.get(3, TimeUnit.SECONDS);
PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "installSnapshot lastIncludedIndex=%d", writeIndex);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(response.getCode());
switch (responseCode) {
case SUCCESS:
+ Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().put(LABEL_REMOTE_ID, this.peerId).build();
+ DLedgerMetricsManager.installSnapshotLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
logger.info("[DoInstallSnapshot-{}]install snapshot success, lastIncludedIndex = {}, lastIncludedTerm", peerId, lastIncludedIndex, lastIncludedTerm);
if (lastIncludedIndex > matchIndex) {
matchIndex = lastIncludedIndex;
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
index a204aeed..6d875412 100644
--- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
@@ -28,6 +28,7 @@
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.entry.DLedgerEntryType;
import io.openmessaging.storage.dledger.exception.DLedgerException;
+import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
@@ -59,6 +60,8 @@
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.PreConditions;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -72,6 +75,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
@@ -80,6 +84,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.LABEL_READ_MODE;
+
public class DLedgerServer extends AbstractDLedgerServer {
private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerServer.class);
@@ -234,7 +240,6 @@ public synchronized void registerStateMachine(final StateMachine fsm) {
}
this.fsmCaller = fsmCaller;
// Register state machine caller to entry pusher
- this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
if (dLedgerStore instanceof DLedgerMmapFileStore) {
((DLedgerMmapFileStore) dLedgerStore).setEnableCleanSpaceService(false);
}
@@ -339,10 +344,13 @@ public AppendFuture appendAsLeader(List bodies) thr
return AppendFuture.newCompletedFuture(-1, null);
}
AppendFuture future;
+ StopWatch watch = StopWatch.createStarted();
DLedgerEntry entry = new DLedgerEntry();
+ long totalBytes = 0;
if (bodies.size() > 1) {
long[] positions = new long[bodies.size()];
for (int i = 0; i < bodies.size(); i++) {
+ totalBytes += bodies.get(i).length;
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bodies.get(i));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
@@ -352,12 +360,23 @@ public AppendFuture appendAsLeader(List bodies) thr
future = new BatchAppendFuture<>(positions);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
+ totalBytes += bodies.get(0).length;
dLedgerEntry.setBody(bodies.get(0));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
future = new AppendFuture<>();
}
final DLedgerEntry finalResEntry = entry;
final AppendFuture finalFuture = future;
+ final long totalBytesFinal = totalBytes;
+ finalFuture.handle((r, e) -> {
+ if (e == null && r.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
+ Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().build();
+ DLedgerMetricsManager.appendEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
+ DLedgerMetricsManager.appendEntryBatchCount.record(bodies.size(), attributes);
+ DLedgerMetricsManager.appendEntryBatchBytes.record(totalBytesFinal, attributes);
+ }
+ return r;
+ });
Closure closure = new Closure() {
@Override
public void done(Status status) {
@@ -421,10 +440,22 @@ private void dealUnsafeRead(ReadClosure closure) throws DLedgerException {
private void dealRaftLogRead(ReadClosure closure) throws DLedgerException {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
+ StopWatch watch = StopWatch.createStarted();
// append an empty raft log, call closure when this raft log is applied
DLedgerEntry emptyEntry = new DLedgerEntry(DLedgerEntryType.NOOP);
DLedgerEntry dLedgerEntry = dLedgerStore.appendAsLeader(emptyEntry);
- dLedgerEntryPusher.appendClosure(closure, dLedgerEntry.getTerm(), dLedgerEntry.getIndex());
+ Closure realClosure = new Closure() {
+ @Override
+ public void done(Status status) {
+ if (status.isOk()) {
+ AttributesBuilder attributesBuilder = DLedgerMetricsManager.newAttributesBuilder();
+ attributesBuilder.put(LABEL_READ_MODE, ReadMode.RAFT_LOG_READ.name().toLowerCase());
+ DLedgerMetricsManager.readLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributesBuilder.build());
+ }
+ closure.done(status);
+ }
+ };
+ dLedgerEntryPusher.appendClosure(realClosure, dLedgerEntry.getTerm(), dLedgerEntry.getIndex());
}
private void dealReadIndexRead(ReadClosure closure) throws DLedgerException {
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsConstant.java b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsConstant.java
new file mode 100644
index 00000000..3ae775f2
--- /dev/null
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsConstant.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017-2022 The DLedger Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.storage.dledger.metrics;
+
+public class DLedgerMetricsConstant {
+
+ // metrics name
+
+ public static final String HISTOGRAM_APPEND_ENTRY_LATENCY = "dledger_append_entry_latency";
+
+ public static final String HISTOGRAM_APPEND_ENTRY_BATCH_BYTES = "dledger_append_entry_batch_bytes";
+
+ public static final String HISTOGRAM_APPEND_ENTRY_BATCH_COUNT = "dledger_append_entry_batch_count";
+
+ public static final String HISTOGRAM_REPLICATE_ENTRY_LATENCY = "dledger_replicate_entry_latency";
+
+ public static final String HISTOGRAM_REPLICATE_ENTRY_BATCH_BYTES = "dledger_replicate_entry_batch_bytes";
+
+ public static final String HISTOGRAM_REPLICATE_ENTRY_BATCH_COUNT = "dledger_replicate_entry_batch_count";
+
+ public static final String HISTOGRAM_APPLY_TASK_LATENCY = "dledger_apply_task_latency";
+
+ public static final String HISTOGRAM_APPLY_TASK_BATCH_COUNT = "dledger_apply_task_batch_count";
+
+ public static final String HISTOGRAM_READ_LATENCY = "dledger_read_latency";
+
+ public static final String HISTOGRAM_SAVE_SNAPSHOT_LATENCY = "dledger_save_snapshot_latency";
+
+ public static final String HISTOGRAM_LOAD_SNAPSHOT_LATENCY = "dledger_load_snapshot_latency";
+
+ public static final String HISTOGRAM_INSTALL_SNAPSHOT_LATENCY = "dledger_install_snapshot_latency";
+
+ public static final String GAUGE_ENTRIES_COUNT = "dledger_entries_count";
+
+ public static final String GAUGE_SNAPSHOT_COUNT = "dledger_snapshot_count";
+
+ public static final String GAUGE_ENTRY_STORE_SIZE = "dledger_entry_store_size";
+
+
+ // label
+
+ public static final String LABEL_GROUP = "group";
+
+ public static final String LABEL_SELF_ID = "self_id";
+
+ public static final String LABEL_REMOTE_ID = "remote_id";
+
+ public static final String LABEL_READ_MODE = "read_mode";
+
+ public static final String METER_NAME = "dledger";
+
+}
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsManager.java b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsManager.java
new file mode 100644
index 00000000..b54bfba8
--- /dev/null
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/DLedgerMetricsManager.java
@@ -0,0 +1,394 @@
+/*
+ * Copyright 2017-2022 The DLedger Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.storage.dledger.metrics;
+
+import com.google.common.base.Splitter;
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import io.openmessaging.storage.dledger.DLedgerServer;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
+import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+
+import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.GAUGE_ENTRIES_COUNT;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.GAUGE_ENTRY_STORE_SIZE;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.GAUGE_SNAPSHOT_COUNT;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_APPEND_ENTRY_BATCH_BYTES;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_APPEND_ENTRY_BATCH_COUNT;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_APPEND_ENTRY_LATENCY;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_APPLY_TASK_BATCH_COUNT;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_APPLY_TASK_LATENCY;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_INSTALL_SNAPSHOT_LATENCY;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_LOAD_SNAPSHOT_LATENCY;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_READ_LATENCY;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_REPLICATE_ENTRY_BATCH_BYTES;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_REPLICATE_ENTRY_BATCH_COUNT;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_REPLICATE_ENTRY_LATENCY;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.HISTOGRAM_SAVE_SNAPSHOT_LATENCY;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.LABEL_GROUP;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.LABEL_SELF_ID;
+import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.METER_NAME;
+
+public class DLedgerMetricsManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerMetricsManager.class);
+
+ private static double us = 1d;
+
+ private static double ms = 1000 * us;
+
+ private static double s = 1000 * ms;
+ public static Supplier attributesBuilderSupplier;
+ public static LongHistogram appendEntryLatency = new NopLongHistogram();
+
+ public static LongHistogram appendEntryBatchBytes = new NopLongHistogram();
+
+ public static LongHistogram appendEntryBatchCount = new NopLongHistogram();
+
+ public static LongHistogram replicateEntryLatency = new NopLongHistogram();
+
+ public static LongHistogram replicateEntryBatchBytes = new NopLongHistogram();
+
+ public static LongHistogram replicateEntryBatchCount = new NopLongHistogram();
+
+ public static LongHistogram applyTaskLatency = new NopLongHistogram();
+
+ public static LongHistogram applyTaskBatchCount = new NopLongHistogram();
+
+ public static LongHistogram readLatency = new NopLongHistogram();
+
+ public static LongHistogram saveSnapshotLatency = new NopLongHistogram();
+
+ public static LongHistogram loadSnapshotLatency = new NopLongHistogram();
+
+ public static LongHistogram installSnapshotLatency = new NopLongHistogram();
+
+ public static ObservableLongGauge entriesCount = new NopObservableLongGauge();
+
+ public static ObservableLongGauge snapshotCount = new NopObservableLongGauge();
+
+ public static ObservableLongGauge entryStoreSize = new NopObservableLongGauge();
+
+ public static OtlpGrpcMetricExporter otlpGrpcMetricExporter;
+
+ public static PeriodicMetricReader periodicMetricReader;
+
+ public static PrometheusHttpServer prometheusHttpServer;
+
+ public static LoggingMetricExporter loggingMetricsExporter;
+
+ public static void init(Meter meter, Supplier attributesBuilderSupplier, DLedgerServer server) {
+ DLedgerMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
+
+ appendEntryLatency = meter.histogramBuilder(HISTOGRAM_APPEND_ENTRY_LATENCY)
+ .setDescription("DLedger append entry latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+
+ appendEntryBatchBytes = meter.histogramBuilder(HISTOGRAM_APPEND_ENTRY_BATCH_BYTES)
+ .setDescription("DLedger append entry batch bytes")
+ .setUnit("bytes")
+ .ofLongs()
+ .build();
+
+ appendEntryBatchCount = meter.histogramBuilder(HISTOGRAM_APPEND_ENTRY_BATCH_COUNT)
+ .setDescription("DLedger append entry batch count")
+ .setUnit("count")
+ .ofLongs()
+ .build();
+
+ replicateEntryLatency = meter.histogramBuilder(HISTOGRAM_REPLICATE_ENTRY_LATENCY)
+ .setDescription("DLedger replicate entry latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+
+ replicateEntryBatchBytes = meter.histogramBuilder(HISTOGRAM_REPLICATE_ENTRY_BATCH_BYTES)
+ .setDescription("DLedger replicate entry batch bytes")
+ .setUnit("bytes")
+ .ofLongs()
+ .build();
+
+ replicateEntryBatchCount = meter.histogramBuilder(HISTOGRAM_REPLICATE_ENTRY_BATCH_COUNT)
+ .setDescription("DLedger replicate entry batch count")
+ .setUnit("count")
+ .ofLongs()
+ .build();
+
+ applyTaskLatency = meter.histogramBuilder(HISTOGRAM_APPLY_TASK_LATENCY)
+ .setDescription("DLedger apply task latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+
+ applyTaskBatchCount = meter.histogramBuilder(HISTOGRAM_APPLY_TASK_BATCH_COUNT)
+ .setDescription("DLedger apply task batch count")
+ .setUnit("count")
+ .ofLongs()
+ .build();
+
+ readLatency = meter.histogramBuilder(HISTOGRAM_READ_LATENCY)
+ .setDescription("DLedger read latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+
+ saveSnapshotLatency = meter.histogramBuilder(HISTOGRAM_SAVE_SNAPSHOT_LATENCY)
+ .setDescription("DLedger save snapshot latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+
+ loadSnapshotLatency = meter.histogramBuilder(HISTOGRAM_LOAD_SNAPSHOT_LATENCY)
+ .setDescription("DLedger load snapshot latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+
+ installSnapshotLatency = meter.histogramBuilder(HISTOGRAM_INSTALL_SNAPSHOT_LATENCY)
+ .setDescription("DLedger install snapshot latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+
+ entriesCount = meter.gaugeBuilder(GAUGE_ENTRIES_COUNT)
+ .setDescription("DLedger entries count")
+ .setUnit("count")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ Attributes attributes = newAttributesBuilder().build();
+ long count = server.getDLedgerStore().getLedgerEndIndex() - server.getDLedgerStore().getLedgerBeforeBeginIndex();
+ measurement.record(count, attributes);
+ });
+
+ snapshotCount = meter.gaugeBuilder(GAUGE_SNAPSHOT_COUNT)
+ .setDescription("DLedger snapshot count")
+ .setUnit("count")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ Optional.ofNullable(server.getFsmCaller().getSnapshotManager()).ifPresent(x -> {
+ long num = x.getSnapshotNum();
+ Attributes attributes = newAttributesBuilder().build();
+ measurement.record(num, attributes);
+ });
+ });
+
+ entryStoreSize = meter.gaugeBuilder(GAUGE_ENTRY_STORE_SIZE)
+ .setDescription("DLedger entry store size")
+ .setUnit("bytes")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
+ });
+ }
+
+ public static AttributesBuilder newAttributesBuilder() {
+ AttributesBuilder builder = attributesBuilderSupplier != null ? attributesBuilderSupplier.get() : Attributes.builder();
+ return builder;
+ }
+
+ private static boolean checkConfig(DLedgerConfig config) {
+ if (config == null) {
+ return false;
+ }
+ MetricsExporterType exporterType = config.getMetricsExporterType();
+ if (!exporterType.isEnable()) {
+ return false;
+ }
+
+ switch (exporterType) {
+ case OTLP_GRPC:
+ return StringUtils.isNotBlank(config.getMetricsGrpcExporterTarget());
+ case PROM:
+ return true;
+ case LOG:
+ return true;
+ }
+ return false;
+ }
+
+ public static void defaultInit(DLedgerServer server) {
+ DLedgerConfig config = server.getdLedgerConfig();
+ MetricsExporterType type = config.getMetricsExporterType();
+ if (type == MetricsExporterType.DISABLE) {
+ return;
+ }
+
+ if (!checkConfig(config)) {
+ LOGGER.error("Metrics exporter config is invalid, please check it");
+ return;
+ }
+
+ SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder().setResource(Resource.empty());
+
+ if (type == MetricsExporterType.OTLP_GRPC) {
+ String endpoint = config.getMetricsGrpcExporterTarget();
+ if (!endpoint.startsWith("http")) {
+ endpoint = "https://" + endpoint;
+ }
+ OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
+ .setEndpoint(endpoint)
+ .setTimeout(config.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS);
+
+ String headers = config.getMetricsGrpcExporterHeader();
+ if (StringUtils.isNotBlank(headers)) {
+ Map headerMap = new HashMap<>();
+ List headerList = Splitter.on(',').omitEmptyStrings().splitToList(headers);
+ for (String header : headerList) {
+ String[] pair = header.split(":");
+ if (pair.length != 2) {
+ LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers);
+ continue;
+ }
+ headerMap.put(pair[0], pair[1]);
+ }
+ headerMap.forEach(metricExporterBuilder::addHeader);
+ }
+
+ otlpGrpcMetricExporter = metricExporterBuilder.build();
+
+ periodicMetricReader = PeriodicMetricReader.builder(otlpGrpcMetricExporter)
+ .setInterval(config.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+ .build();
+
+ providerBuilder.registerMetricReader(periodicMetricReader);
+ }
+
+ if (type == MetricsExporterType.PROM) {
+ String promExporterHost = config.getMetricsPromExporterHost();
+ if (StringUtils.isBlank(promExporterHost)) {
+ promExporterHost = "0.0.0.0";
+ }
+ prometheusHttpServer = PrometheusHttpServer.builder()
+ .setHost(promExporterHost)
+ .setPort(config.getMetricsPromExporterPort())
+ .build();
+ providerBuilder.registerMetricReader(prometheusHttpServer);
+ }
+
+ if (type == MetricsExporterType.LOG) {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ loggingMetricsExporter = LoggingMetricExporter.create(AggregationTemporality.CUMULATIVE);
+ java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
+ periodicMetricReader = PeriodicMetricReader.builder(loggingMetricsExporter)
+ .setInterval(config.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+ .build();
+ providerBuilder.registerMetricReader(periodicMetricReader);
+ }
+
+ registerMetricsView(providerBuilder);
+
+ Meter meter = OpenTelemetrySdk.builder().setMeterProvider(providerBuilder.build())
+ .build().getMeter(METER_NAME);
+ AttributesBuilder builder = Attributes.builder().put(LABEL_GROUP, config.getGroup()).put(LABEL_SELF_ID, config.getSelfId());
+ init(meter, () -> {
+ return Attributes.builder().put(LABEL_GROUP, config.getGroup()).put(LABEL_SELF_ID, config.getSelfId());
+ }, server);
+ }
+
+ private static void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
+ // define latency bucket
+ List latencyBuckets = Arrays.asList(
+ 1 * us, 3 * us, 5 * us,
+ 10 * us, 30 * us, 50 * us,
+ 100 * us, 300 * us, 500 * us,
+ 1 * ms, 2 * ms, 3 * ms, 5 * ms,
+ 10 * ms, 30 * ms, 50 * ms,
+ 100 * ms, 300 * ms, 500 * ms,
+ 1 * s, 3 * s, 5 * s,
+ 10 * s
+ );
+
+ View latecyView = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets))
+ .build();
+
+ InstrumentSelector appendEntryLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_APPEND_ENTRY_LATENCY)
+ .build();
+
+ InstrumentSelector replicateEntryLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_REPLICATE_ENTRY_LATENCY)
+ .build();
+
+ InstrumentSelector applyTaskLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_APPLY_TASK_LATENCY)
+ .build();
+
+ InstrumentSelector readLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_READ_LATENCY)
+ .build();
+
+ InstrumentSelector saveSnapshotLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_SAVE_SNAPSHOT_LATENCY)
+ .build();
+
+ InstrumentSelector loadSnapshotLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_LOAD_SNAPSHOT_LATENCY)
+ .build();
+
+ InstrumentSelector installSnapshotLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_INSTALL_SNAPSHOT_LATENCY)
+ .build();
+
+ providerBuilder.registerView(appendEntryLatencySelector, latecyView);
+ providerBuilder.registerView(replicateEntryLatencySelector, latecyView);
+ providerBuilder.registerView(applyTaskLatencySelector, latecyView);
+ providerBuilder.registerView(readLatencySelector, latecyView);
+ providerBuilder.registerView(saveSnapshotLatencySelector, latecyView);
+ providerBuilder.registerView(loadSnapshotLatencySelector, latecyView);
+ providerBuilder.registerView(installSnapshotLatencySelector, latecyView);
+ }
+
+}
+
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/MetricsExporterType.java b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/MetricsExporterType.java
new file mode 100644
index 00000000..b36681aa
--- /dev/null
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/MetricsExporterType.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017-2022 The DLedger Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.storage.dledger.metrics;
+
+
+public enum MetricsExporterType {
+ DISABLE(0),
+ OTLP_GRPC(1),
+ PROM(2),
+ LOG(3);
+
+ private final int value;
+
+ MetricsExporterType(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+
+ public static MetricsExporterType valueOf(int value) {
+ switch (value) {
+ case 1:
+ return OTLP_GRPC;
+ case 2:
+ return PROM;
+ case 3:
+ return LOG;
+ default:
+ return DISABLE;
+ }
+ }
+
+ public boolean isEnable() {
+ return this.value > 0;
+ }
+}
\ No newline at end of file
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongCounter.java b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongCounter.java
new file mode 100644
index 00000000..aaa9cd5d
--- /dev/null
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongCounter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017-2022 The DLedger Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.storage.dledger.metrics;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.context.Context;
+
+public class NopLongCounter implements LongCounter {
+ @Override
+ public void add(long l) {
+
+ }
+
+ @Override
+ public void add(long l, Attributes attributes) {
+
+ }
+
+ @Override
+ public void add(long l, Attributes attributes, Context context) {
+
+ }
+}
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongHistogram.java b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongHistogram.java
new file mode 100644
index 00000000..ff3432f4
--- /dev/null
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopLongHistogram.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2017-2022 The DLedger Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.storage.dledger.metrics;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.context.Context;
+
+public class NopLongHistogram implements LongHistogram {
+
+ @Override
+ public void record(long l) {
+
+ }
+
+ @Override
+ public void record(long l, Attributes attributes) {
+
+ }
+
+ @Override
+ public void record(long l, Attributes attributes, Context context) {
+
+ }
+}
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopObservableLongGauge.java b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopObservableLongGauge.java
new file mode 100644
index 00000000..aaa4864e
--- /dev/null
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/metrics/NopObservableLongGauge.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2017-2022 The DLedger Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.storage.dledger.metrics;
+
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+
+public class NopObservableLongGauge implements ObservableLongGauge {
+}
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java
index 859da226..554667d6 100644
--- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java
@@ -155,6 +155,10 @@ public SnapshotReader getSnapshotReader() {
}
}
+ public long getSnapshotNum() {
+ return this.snapshotStore.getSnapshotNum();
+ }
+
public void saveSnapshot() {
// Check if still saving other snapshots
if (this.savingSnapshot) {
@@ -243,25 +247,7 @@ private void truncatePrefix(long lastIncludedIndex, long lastIncludedTerm) {
private void deleteExpiredSnapshot() {
// Remove the oldest snapshot
- DLedgerConfig config = dLedgerServer.getDLedgerConfig();
- File[] snapshotFiles = new File(config.getSnapshotStoreBaseDir()).listFiles();
- if (snapshotFiles != null && snapshotFiles.length > config.getMaxSnapshotReservedNum()) {
- long minSnapshotIdx = Long.MAX_VALUE;
- for (File file : snapshotFiles) {
- String fileName = file.getName();
- if (!fileName.startsWith(SnapshotManager.SNAPSHOT_DIR_PREFIX)) {
- continue;
- }
- minSnapshotIdx = Math.min(Long.parseLong(fileName.substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length())), minSnapshotIdx);
- }
- String deleteFilePath = config.getSnapshotStoreBaseDir() + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX + minSnapshotIdx;
- try {
- IOUtils.deleteFile(new File(deleteFilePath));
- logger.info("Delete expired snapshot: {}", deleteFilePath);
- } catch (IOException e) {
- logger.error("Unable to remove expired snapshot: {}", deleteFilePath, e);
- }
- }
+ snapshotStore.deleteExpiredSnapshot(dLedgerConfig.getMaxSnapshotReservedNum());
}
public CompletableFuture loadSnapshot() {
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStore.java
index 8c7e1526..224c11c4 100644
--- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStore.java
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStore.java
@@ -23,4 +23,8 @@ public interface SnapshotStore {
SnapshotReader createSnapshotReader();
boolean downloadSnapshot(DownloadSnapshot downloadSnapshot);
+
+ void deleteExpiredSnapshot(long maxReservedSnapshotNum);
+
+ long getSnapshotNum();
}
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotStore.java
index 68134d01..99602060 100644
--- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotStore.java
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotStore.java
@@ -24,6 +24,11 @@
import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
import io.openmessaging.storage.dledger.utils.IOUtils;
import java.io.FileOutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +37,7 @@
public class FileSnapshotStore implements SnapshotStore {
- private static Logger logger = LoggerFactory.getLogger(FileSnapshotStore.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileSnapshotStore.class);
private final String snapshotStoreBaseDir;
@@ -47,7 +52,7 @@ private void initStore() {
try {
IOUtils.mkDir(dir);
} catch (IOException e) {
- logger.error("Unable to create snapshot storage directory {}", this.snapshotStoreBaseDir, e);
+ LOGGER.error("Unable to create snapshot storage directory {}", this.snapshotStoreBaseDir, e);
throw new RuntimeException(e);
}
// Clean temp directory to remove existing dirty snapshots
@@ -56,7 +61,7 @@ private void initStore() {
try {
IOUtils.deleteFile(tmpSnapshot);
} catch (IOException e) {
- logger.error("Unable to clean temp snapshots {}", tmpSnapshot.getPath(), e);
+ LOGGER.error("Unable to clean temp snapshots {}", tmpSnapshot.getPath(), e);
throw new RuntimeException(e);
}
}
@@ -68,32 +73,30 @@ public SnapshotWriter createSnapshotWriter() {
}
private SnapshotWriter createSnapshotWriter(String snapshotStorePath) {
- // Delete temp snapshot
- String tmpSnapshotStorePath = snapshotStorePath;
- if (new File(tmpSnapshotStorePath).exists()) {
+ if (new File(snapshotStorePath).exists()) {
try {
- IOUtils.deleteFile(new File(tmpSnapshotStorePath));
+ IOUtils.deleteFile(new File(snapshotStorePath));
} catch (IOException e) {
- logger.error("Unable to delete temp snapshot: {}", tmpSnapshotStorePath, e);
+ LOGGER.error("Unable to delete temp snapshot: {}", snapshotStorePath, e);
return null;
}
}
// Create tmp directory for writing snapshots
- File dir = new File(tmpSnapshotStorePath);
+ File dir = new File(snapshotStorePath);
try {
IOUtils.mkDir(dir);
} catch (IOException e) {
- logger.error("Unable to create snapshot storage directory: " + tmpSnapshotStorePath, e);
+ LOGGER.error("Unable to create snapshot storage directory: " + snapshotStorePath, e);
return null;
}
- return new FileSnapshotWriter(tmpSnapshotStorePath, this);
+ return new FileSnapshotWriter(snapshotStorePath, this);
}
@Override
public SnapshotReader createSnapshotReader() {
long lastSnapshotIndex = getLastSnapshotIdx();
if (lastSnapshotIndex == -1) {
- logger.warn("No snapshot exists");
+ LOGGER.warn("No snapshot exists");
return null;
}
String snapshotStorePath = this.snapshotStoreBaseDir + File.separator +
@@ -110,7 +113,7 @@ public boolean downloadSnapshot(DownloadSnapshot downloadSnapshot) {
try {
IOUtils.deleteFile(installTmpDirFile);
} catch (IOException e) {
- logger.error("Unable to delete temp install snapshot: {}", installTmpDir, e);
+ LOGGER.error("Unable to delete temp install snapshot: {}", installTmpDir, e);
return false;
}
}
@@ -118,12 +121,16 @@ public boolean downloadSnapshot(DownloadSnapshot downloadSnapshot) {
try {
IOUtils.mkDir(installTmpDirFile);
} catch (IOException e) {
- logger.error("Unable to create temp install snapshot dir: {}", installTmpDir, e);
+ LOGGER.error("Unable to create temp install snapshot dir: {}", installTmpDir, e);
return false;
}
// write meta and data to temp install snapshot dir and then move it to snapshot store dir
try {
SnapshotWriter writer = createSnapshotWriter(installTmpDir);
+ if (writer == null) {
+ LOGGER.error("Unable to create snapshot writer for install snapshot: {}", downloadSnapshot);
+ return false;
+ }
writer.setSnapshotMeta(downloadSnapshot.getMeta());
FileOutputStream fileOutputStream = new FileOutputStream(writer.getSnapshotStorePath() + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE);
fileOutputStream.write(downloadSnapshot.getData());
@@ -132,24 +139,59 @@ public boolean downloadSnapshot(DownloadSnapshot downloadSnapshot) {
writer.save(SnapshotStatus.SUCCESS);
return true;
} catch (Exception e) {
- logger.error("Unable to write snapshot: {} data to install snapshot", downloadSnapshot, e);
+ LOGGER.error("Unable to write snapshot: {} data to install snapshot", downloadSnapshot, e);
return false;
}
}
+ @Override
+ public void deleteExpiredSnapshot(long maxReservedSnapshotNum) {
+ // Remove the oldest snapshot
+ List realSnapshotFiles = getSnapshotFiles().stream().sorted((o1, o2) -> {
+ long idx1 = Long.parseLong(o1.getName().substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length()));
+ long idx2 = Long.parseLong(o2.getName().substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length()));
+ return Long.compare(idx1, idx2);
+ }
+ ).collect(Collectors.toList());
+ if (realSnapshotFiles.size() <= maxReservedSnapshotNum) {
+ return;
+ }
+ realSnapshotFiles.stream().limit(realSnapshotFiles.size() - maxReservedSnapshotNum).forEach(file -> {
+ try {
+ IOUtils.deleteFile(file);
+ LOGGER.info("Delete expired snapshot: {}", file.getPath());
+ } catch (IOException e) {
+ LOGGER.error("Unable to remove expired snapshot: {}", file.getPath(), e);
+ }
+ });
+ }
+
+ @Override
+ public long getSnapshotNum() {
+ return getSnapshotFiles().size();
+ }
+
private long getLastSnapshotIdx() {
- File[] snapshotFiles = new File(this.snapshotStoreBaseDir).listFiles();
- long lastSnapshotIdx = -1;
- if (snapshotFiles != null && snapshotFiles.length > 0) {
- for (File snapshotFile : snapshotFiles) {
- String fileName = snapshotFile.getName();
- if (!fileName.startsWith(SnapshotManager.SNAPSHOT_DIR_PREFIX)) {
- continue;
- }
- lastSnapshotIdx = Math.max(Long.parseLong(fileName.substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length())), lastSnapshotIdx);
+ Optional optionalFile = getSnapshotFiles().stream().min((o1, o2) -> {
+ long idx1 = Long.parseLong(o1.getName().substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length()));
+ long idx2 = Long.parseLong(o2.getName().substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length()));
+ return Long.compare(idx2, idx1);
}
+ );
+ long index = -1;
+ if (optionalFile.isPresent()) {
+ File file = optionalFile.get();
+ index = Long.parseLong(file.getName().substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length()));
+ }
+ return index;
+ }
+
+ private List getSnapshotFiles() {
+ File[] snapshotFiles = new File(this.snapshotStoreBaseDir).listFiles();
+ if (snapshotFiles == null || snapshotFiles.length == 0) {
+ return Collections.emptyList();
}
- return lastSnapshotIdx;
+ return Arrays.stream(snapshotFiles).filter(file -> file.getName().startsWith(SnapshotManager.SNAPSHOT_DIR_PREFIX)).collect(Collectors.toList());
}
public String getSnapshotStoreBaseDir() {
diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java
index 1366fd1f..340c80f9 100644
--- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java
+++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java
@@ -22,6 +22,7 @@
import io.openmessaging.storage.dledger.common.ShutdownAbleThread;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
+import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.snapshot.SnapshotManager;
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
import io.openmessaging.storage.dledger.snapshot.SnapshotStatus;
@@ -32,6 +33,7 @@
import io.openmessaging.storage.dledger.snapshot.hook.SnapshotHook;
import io.openmessaging.storage.dledger.store.DLedgerStore;
+import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
@@ -41,6 +43,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,8 +190,12 @@ private void doCommitted(final long committedIndex) {
return;
}
final ApplyEntryIterator iter = new ApplyEntryIterator(this.dLedgerStore, committedIndex, lastAppliedIndex, this.completeEntryCallback);
+ StopWatch watch = StopWatch.createStarted();
this.statemachine.onApply(iter);
+ Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().build();
+ DLedgerMetricsManager.applyTaskLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
final long lastIndex = iter.getIndex();
+ DLedgerMetricsManager.applyTaskBatchCount.record(lastIndex - lastAppliedIndex, attributes);
DLedgerEntry entry = this.dLedgerStore.get(lastIndex);
this.memberState.updateAppliedIndexAndTerm(lastIndex, entry.getTerm());
// Take snapshot
@@ -225,6 +232,7 @@ private void doSnapshotLoad(LoadSnapshotHook loadSnapshotAfter) {
loadSnapshotAfter.doCallBack(SnapshotStatus.EXPIRED);
return;
}
+ StopWatch watch = StopWatch.createStarted();
// Load data from the state machine
try {
if (!this.statemachine.onSnapshotLoad(reader)) {
@@ -237,6 +245,7 @@ private void doSnapshotLoad(LoadSnapshotHook loadSnapshotAfter) {
loadSnapshotAfter.doCallBack(SnapshotStatus.FAIL);
return;
}
+ DLedgerMetricsManager.loadSnapshotLatency.record(watch.getTime(TimeUnit.MICROSECONDS), DLedgerMetricsManager.newAttributesBuilder().build());
// Update statemachine info
this.memberState.updateAppliedIndexAndTerm(snapshotIndex, snapshotTerm);
this.memberState.leaderUpdateCommittedIndex(snapshotTerm, snapshotIndex);
@@ -251,6 +260,7 @@ private void doSnapshotSave(SaveSnapshotHook saveSnapshotAfter) {
if (writer == null) {
return;
}
+ StopWatch watch = StopWatch.createStarted();
// Save data through the state machine
try {
if (!this.statemachine.onSnapshotSave(writer)) {
@@ -263,6 +273,7 @@ private void doSnapshotSave(SaveSnapshotHook saveSnapshotAfter) {
saveSnapshotAfter.doCallBack(SnapshotStatus.FAIL);
return;
}
+ DLedgerMetricsManager.saveSnapshotLatency.record(watch.getTime(TimeUnit.MICROSECONDS), DLedgerMetricsManager.newAttributesBuilder().build());
saveSnapshotAfter.doCallBack(SnapshotStatus.SUCCESS);
}
diff --git a/example/src/main/java/io/openmessaging/storage/dledger/example/register/RegisterDLedger.java b/example/src/main/java/io/openmessaging/storage/dledger/example/register/RegisterDLedger.java
index b1185441..e1854732 100644
--- a/example/src/main/java/io/openmessaging/storage/dledger/example/register/RegisterDLedger.java
+++ b/example/src/main/java/io/openmessaging/storage/dledger/example/register/RegisterDLedger.java
@@ -20,6 +20,7 @@
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.example.register.protocol.RegisterReadProcessor;
import io.openmessaging.storage.dledger.example.register.protocol.RegisterWriteProcessor;
+import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ public static void bootstrap(DLedgerConfig dLedgerConfig) {
dLedgerServer.registerUserDefineProcessors(
Arrays.asList(new RegisterWriteProcessor(dLedgerServer), new RegisterReadProcessor(dLedgerServer))
);
+ DLedgerMetricsManager.defaultInit(dLedgerServer);
dLedgerServer.startup();
LOGGER.info("RegisterDLedger started");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
diff --git a/example/src/main/java/io/openmessaging/storage/dledger/example/register/command/RegisterCommand.java b/example/src/main/java/io/openmessaging/storage/dledger/example/register/command/RegisterCommand.java
index 5dacaea9..d100b77e 100644
--- a/example/src/main/java/io/openmessaging/storage/dledger/example/register/command/RegisterCommand.java
+++ b/example/src/main/java/io/openmessaging/storage/dledger/example/register/command/RegisterCommand.java
@@ -21,6 +21,7 @@
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.example.common.command.BaseCommand;
import io.openmessaging.storage.dledger.example.register.RegisterDLedger;
+import io.openmessaging.storage.dledger.metrics.MetricsExporterType;
import java.io.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,12 @@ public class RegisterCommand extends BaseCommand {
@Parameter(names = {"--max-batch-append-interval"}, description = "Max batch append interval in ms")
private int maxBatchAppendIntervalMs = 10;
+ @Parameter(names = {"--metrics-exporter-type"}, description = "Metrics exporter type")
+ private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
+
+ @Parameter(names = {"--metrics-prom-export-port"}, description = "Metrics prometheus export port")
+ private int metricsPromExportPort = 5557;
+
@Override
public void doCommand() {
try {
@@ -95,6 +102,8 @@ private DLedgerConfig buildDLedgerConfig() {
dLedgerConfig.setEnableBatchAppend(enableBatchAppend);
dLedgerConfig.setMaxBatchAppendSize(maxBatchAppendSize);
dLedgerConfig.setMaxBatchAppendSize(maxBatchAppendIntervalMs);
+ dLedgerConfig.setMetricsExporterType(metricsExporterType);
+ dLedgerConfig.setMetricsPromExporterPort(metricsPromExportPort);
return dLedgerConfig;
}
}
diff --git a/pom.xml b/pom.xml
index adb53d75..36729679 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,9 @@
5.1.0
jacoco
3.12.0
+ 1.26.0
+ 1.26.0-alpha
+ 2.0.6
${project.basedir}/../test/target/jacoco-it.exec
file:**/generated-sources/**,**/test/**
@@ -116,6 +119,31 @@
commons-lang3
${commons-lang3.version}
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-exporter-prometheus
+ ${opentelemetry-exporter-prometheus.version}
+
+
+ io.opentelemetry
+ opentelemetry-exporter-logging
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ ${opentelemetry.version}
+
+
+ org.slf4j
+ jul-to-slf4j
+ ${jul-to-slf4j.version}
+
diff --git a/script/deploy-three-register.sh b/script/deploy-three-register.sh
new file mode 100755
index 00000000..c713f488
--- /dev/null
+++ b/script/deploy-three-register.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+#
+# Copyright 2017-2022 The DLedger Authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+nohup java -jar ../example/target/dledger-example.jar register --peers "n0-localhost:20911;n1-localhost:20912;n2-localhost:20913" --id n0 --metrics-exporter-type PROM --metrics-prom-export-port 5557 > register-n0.log 2>&1 &
+
+nohup java -jar ../example/target/dledger-example.jar register --peers "n0-localhost:20911;n1-localhost:20912;n2-localhost:20913" --id n1 --metrics-exporter-type PROM --metrics-prom-export-port 5558 > register-n1.log 2>&1 &
+
+nohup java -jar ../example/target/dledger-example.jar register --peers "n0-localhost:20911;n1-localhost:20912;n2-localhost:20913" --id n2 --metrics-exporter-type PROM --metrics-prom-export-port 5559 > register-n2.log 2>&1 &
+