Skip to content

Commit

Permalink
[ISSUE #307] Support metrics (#308)
Browse files Browse the repository at this point in the history
* feat(core): add metrics manager

1. add metrics manager

* feat(core): record metrics

1. record metrics

* fix(core): avoid NPE

1. avoid NPE

* style(core): fix checkstyle

1. fix checkstyle

* feat(core): optimize metrics name

1. optimize metrics name

* rerun

* rerun
  • Loading branch information
TheR1sing3un committed Jul 25, 2023
1 parent f0ad185 commit cfd4191
Show file tree
Hide file tree
Showing 18 changed files with 904 additions and 46 deletions.
20 changes: 20 additions & 0 deletions dledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.openmessaging.storage.dledger;

import io.openmessaging.storage.dledger.metrics.MetricsExporterType;
import io.openmessaging.storage.dledger.snapshot.SnapshotEntryResetStrategy;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;

Expand Down Expand Up @@ -119,6 +120,17 @@ public class DLedgerConfig {
*/
private boolean enableFastAdvanceCommitIndex = false;

private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;

private String metricsGrpcExporterTarget = "";
private String metricsGrpcExporterHeader = "";
private long metricGrpcExporterTimeOutInMills = 3 * 1000;
private long metricGrpcExporterIntervalInMills = 60 * 1000;
private long metricLoggingExporterIntervalInMills = 10 * 1000;

private int metricsPromExporterPort = 5557;
private String metricsPromExporterHost = "";

public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
Expand Down Expand Up @@ -559,4 +571,68 @@ public boolean isEnableFastAdvanceCommitIndex() {
public void setEnableFastAdvanceCommitIndex(boolean enableFastAdvanceCommitIndex) {
this.enableFastAdvanceCommitIndex = enableFastAdvanceCommitIndex;
}

public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
this.metricsExporterType = metricsExporterType;
}

public MetricsExporterType getMetricsExporterType() {
return metricsExporterType;
}

public void setMetricsGrpcExporterTarget(String metricsGrpcExporterTarget) {
this.metricsGrpcExporterTarget = metricsGrpcExporterTarget;
}

public void setMetricsGrpcExporterHeader(String metricsGrpcExporterHeader) {
this.metricsGrpcExporterHeader = metricsGrpcExporterHeader;
}

public void setMetricGrpcExporterTimeOutInMills(long metricGrpcExporterTimeOutInMills) {
this.metricGrpcExporterTimeOutInMills = metricGrpcExporterTimeOutInMills;
}

public void setMetricGrpcExporterIntervalInMills(long metricGrpcExporterIntervalInMills) {
this.metricGrpcExporterIntervalInMills = metricGrpcExporterIntervalInMills;
}

public void setMetricLoggingExporterIntervalInMills(long metricLoggingExporterIntervalInMills) {
this.metricLoggingExporterIntervalInMills = metricLoggingExporterIntervalInMills;
}

public void setMetricsPromExporterPort(int metricsPromExporterPort) {
this.metricsPromExporterPort = metricsPromExporterPort;
}

public void setMetricsPromExporterHost(String metricsPromExporterHost) {
this.metricsPromExporterHost = metricsPromExporterHost;
}

public String getMetricsGrpcExporterTarget() {
return metricsGrpcExporterTarget;
}

public String getMetricsGrpcExporterHeader() {
return metricsGrpcExporterHeader;
}

public long getMetricGrpcExporterTimeOutInMills() {
return metricGrpcExporterTimeOutInMills;
}

public long getMetricGrpcExporterIntervalInMills() {
return metricGrpcExporterIntervalInMills;
}

public long getMetricLoggingExporterIntervalInMills() {
return metricLoggingExporterIntervalInMills;
}

public int getMetricsPromExporterPort() {
return metricsPromExporterPort;
}

public String getMetricsPromExporterHost() {
return metricsPromExporterHost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.openmessaging.storage.dledger.common.WriteClosure;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
Expand All @@ -41,6 +42,7 @@
import io.openmessaging.storage.dledger.utils.PreConditions;
import io.openmessaging.storage.dledger.utils.Quota;

import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -56,9 +58,12 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.LABEL_REMOTE_ID;

public class DLedgerEntryPusher {

private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerEntryPusher.class);
Expand Down Expand Up @@ -708,6 +713,9 @@ private void sendBatchAppendEntryRequest() throws Exception {
final long firstIndex = batchAppendEntryRequest.getFirstEntryIndex();
final long lastIndex = batchAppendEntryRequest.getLastEntryIndex();
final long lastTerm = batchAppendEntryRequest.getLastEntryTerm();
final long entriesCount = batchAppendEntryRequest.getCount();
final long entriesSize = batchAppendEntryRequest.getTotalSize();
StopWatch watch = StopWatch.createStarted();
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
pendingMap.put(firstIndex, new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
responseFuture.whenComplete((x, ex) -> {
Expand All @@ -716,6 +724,10 @@ private void sendBatchAppendEntryRequest() throws Exception {
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
case SUCCESS:
Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().put(LABEL_REMOTE_ID, this.peerId).build();
DLedgerMetricsManager.replicateEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
DLedgerMetricsManager.replicateEntryBatchCount.record(entriesCount, attributes);
DLedgerMetricsManager.replicateEntryBatchBytes.record(entriesSize, attributes);
pendingMap.remove(firstIndex);
if (lastIndex > matchIndex) {
matchIndex = lastIndex;
Expand Down Expand Up @@ -767,12 +779,15 @@ private void doInstallSnapshot() throws Exception {
long lastIncludedIndex = snapshot.getMeta().getLastIncludedIndex();
long lastIncludedTerm = snapshot.getMeta().getLastIncludedTerm();
InstallSnapshotRequest request = buildInstallSnapshotRequest(snapshot);
StopWatch watch = StopWatch.createStarted();
CompletableFuture<InstallSnapshotResponse> future = DLedgerEntryPusher.this.dLedgerRpcService.installSnapshot(request);
InstallSnapshotResponse response = future.get(3, TimeUnit.SECONDS);
PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "installSnapshot lastIncludedIndex=%d", writeIndex);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(response.getCode());
switch (responseCode) {
case SUCCESS:
Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().put(LABEL_REMOTE_ID, this.peerId).build();
DLedgerMetricsManager.installSnapshotLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
logger.info("[DoInstallSnapshot-{}]install snapshot success, lastIncludedIndex = {}, lastIncludedTerm", peerId, lastIncludedIndex, lastIncludedTerm);
if (lastIncludedIndex > matchIndex) {
matchIndex = lastIncludedIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.entry.DLedgerEntryType;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
Expand Down Expand Up @@ -59,6 +60,8 @@
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.PreConditions;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -72,6 +75,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import org.apache.commons.lang3.time.StopWatch;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand All @@ -80,6 +84,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant.LABEL_READ_MODE;

public class DLedgerServer extends AbstractDLedgerServer {

private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerServer.class);
Expand Down Expand Up @@ -234,7 +240,6 @@ public synchronized void registerStateMachine(final StateMachine fsm) {
}
this.fsmCaller = fsmCaller;
// Register state machine caller to entry pusher
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
if (dLedgerStore instanceof DLedgerMmapFileStore) {
((DLedgerMmapFileStore) dLedgerStore).setEnableCleanSpaceService(false);
}
Expand Down Expand Up @@ -339,10 +344,13 @@ public AppendFuture<AppendEntryResponse> appendAsLeader(List<byte[]> bodies) thr
return AppendFuture.newCompletedFuture(-1, null);
}
AppendFuture<AppendEntryResponse> future;
StopWatch watch = StopWatch.createStarted();
DLedgerEntry entry = new DLedgerEntry();
long totalBytes = 0;
if (bodies.size() > 1) {
long[] positions = new long[bodies.size()];
for (int i = 0; i < bodies.size(); i++) {
totalBytes += bodies.get(i).length;
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bodies.get(i));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
Expand All @@ -352,12 +360,23 @@ public AppendFuture<AppendEntryResponse> appendAsLeader(List<byte[]> bodies) thr
future = new BatchAppendFuture<>(positions);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
totalBytes += bodies.get(0).length;
dLedgerEntry.setBody(bodies.get(0));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
future = new AppendFuture<>();
}
final DLedgerEntry finalResEntry = entry;
final AppendFuture<AppendEntryResponse> finalFuture = future;
final long totalBytesFinal = totalBytes;
finalFuture.handle((r, e) -> {
if (e == null && r.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
Attributes attributes = DLedgerMetricsManager.newAttributesBuilder().build();
DLedgerMetricsManager.appendEntryLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributes);
DLedgerMetricsManager.appendEntryBatchCount.record(bodies.size(), attributes);
DLedgerMetricsManager.appendEntryBatchBytes.record(totalBytesFinal, attributes);
}
return r;
});
Closure closure = new Closure() {
@Override
public void done(Status status) {
Expand Down Expand Up @@ -421,10 +440,22 @@ private void dealUnsafeRead(ReadClosure closure) throws DLedgerException {

private void dealRaftLogRead(ReadClosure closure) throws DLedgerException {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
StopWatch watch = StopWatch.createStarted();
// append an empty raft log, call closure when this raft log is applied
DLedgerEntry emptyEntry = new DLedgerEntry(DLedgerEntryType.NOOP);
DLedgerEntry dLedgerEntry = dLedgerStore.appendAsLeader(emptyEntry);
dLedgerEntryPusher.appendClosure(closure, dLedgerEntry.getTerm(), dLedgerEntry.getIndex());
Closure realClosure = new Closure() {
@Override
public void done(Status status) {
if (status.isOk()) {
AttributesBuilder attributesBuilder = DLedgerMetricsManager.newAttributesBuilder();
attributesBuilder.put(LABEL_READ_MODE, ReadMode.RAFT_LOG_READ.name().toLowerCase());
DLedgerMetricsManager.readLatency.record(watch.getTime(TimeUnit.MICROSECONDS), attributesBuilder.build());
}
closure.done(status);
}
};
dLedgerEntryPusher.appendClosure(realClosure, dLedgerEntry.getTerm(), dLedgerEntry.getIndex());
}

private void dealReadIndexRead(ReadClosure closure) throws DLedgerException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.metrics;

public class DLedgerMetricsConstant {

// metrics name

public static final String HISTOGRAM_APPEND_ENTRY_LATENCY = "dledger_append_entry_latency";

public static final String HISTOGRAM_APPEND_ENTRY_BATCH_BYTES = "dledger_append_entry_batch_bytes";

public static final String HISTOGRAM_APPEND_ENTRY_BATCH_COUNT = "dledger_append_entry_batch_count";

public static final String HISTOGRAM_REPLICATE_ENTRY_LATENCY = "dledger_replicate_entry_latency";

public static final String HISTOGRAM_REPLICATE_ENTRY_BATCH_BYTES = "dledger_replicate_entry_batch_bytes";

public static final String HISTOGRAM_REPLICATE_ENTRY_BATCH_COUNT = "dledger_replicate_entry_batch_count";

public static final String HISTOGRAM_APPLY_TASK_LATENCY = "dledger_apply_task_latency";

public static final String HISTOGRAM_APPLY_TASK_BATCH_COUNT = "dledger_apply_task_batch_count";

public static final String HISTOGRAM_READ_LATENCY = "dledger_read_latency";

public static final String HISTOGRAM_SAVE_SNAPSHOT_LATENCY = "dledger_save_snapshot_latency";

public static final String HISTOGRAM_LOAD_SNAPSHOT_LATENCY = "dledger_load_snapshot_latency";

public static final String HISTOGRAM_INSTALL_SNAPSHOT_LATENCY = "dledger_install_snapshot_latency";

public static final String GAUGE_ENTRIES_COUNT = "dledger_entries_count";

public static final String GAUGE_SNAPSHOT_COUNT = "dledger_snapshot_count";

public static final String GAUGE_ENTRY_STORE_SIZE = "dledger_entry_store_size";


// label

public static final String LABEL_GROUP = "group";

public static final String LABEL_SELF_ID = "self_id";

public static final String LABEL_REMOTE_ID = "remote_id";

public static final String LABEL_READ_MODE = "read_mode";

public static final String METER_NAME = "dledger";

}
Loading

0 comments on commit cfd4191

Please sign in to comment.