Skip to content

Commit

Permalink
[ISSUE #290] Add switch to enable/disable snapshot feature (#291)
Browse files Browse the repository at this point in the history
1. add switch to enable/disable snapshot feature

Closes #290
  • Loading branch information
TheR1sing3un committed May 29, 2023
1 parent 91acd65 commit 339149c
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class DLedgerConfig {

private long leadershipTransferWaitTimeout = 1000;

private boolean enableSnapshot = false;

private int snapshotThreshold = 1000;
private int maxSnapshotReservedNum = 3;

Expand Down Expand Up @@ -485,4 +487,12 @@ public int getMaxSnapshotReservedNum() {
public void setMaxSnapshotReservedNum(int maxSnapshotReservedNum) {
this.maxSnapshotReservedNum = maxSnapshotReservedNum;
}

public boolean isEnableSnapshot() {
return enableSnapshot;
}

public void setEnableSnapshot(boolean enableSnapshot) {
this.enableSnapshot = enableSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public synchronized void startup() {
this.fsmCaller.ifPresent(x -> {
// Start state machine caller and load existing snapshots for data recovery
x.start();
x.getSnapshotManager().loadSnapshot();
Optional.ofNullable(x.getSnapshotManager()).ifPresent(sm -> sm.loadSnapshot());
});
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
Expand Down Expand Up @@ -191,7 +191,9 @@ public synchronized void registerStateMachine(final StateMachine fsm) {
throw new IllegalStateException("can not register statemachine after dledger server starts");
}
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
fsmCaller.registerSnapshotManager(new SnapshotManager(this));
if (this.dLedgerConfig.isEnableSnapshot()) {
fsmCaller.registerSnapshotManager(new SnapshotManager(this));
}
this.fsmCaller = Optional.of(fsmCaller);
// Register state machine caller to entry pusher
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.openmessaging.storage.dledger.store.DLedgerStore;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -87,7 +88,7 @@ public Thread newThread(Runnable r) {
});
private final Function<Long, Boolean> completeEntryCallback;
private volatile DLedgerException error;
private SnapshotManager snapshotManager;
private Optional<SnapshotManager> snapshotManager;

public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine statemachine,
final DLedgerEntryPusher entryPusher) {
Expand All @@ -103,6 +104,7 @@ public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine st
} else {
this.completeEntryCallback = index -> true;
}
this.snapshotManager = Optional.empty();
}

private boolean enqueueTask(final ApplyTask task) {
Expand Down Expand Up @@ -170,7 +172,7 @@ private void doCommitted(final long committedIndex) {
if (this.error != null) {
return;
}
if (this.snapshotManager.isLoadingSnapshot() || this.snapshotManager.isSavingSnapshot()) {
if (this.snapshotManager.isPresent() && (this.snapshotManager.get().isLoadingSnapshot() || this.snapshotManager.get().isSavingSnapshot())) {
this.scheduledExecutorService.schedule(() -> {
try {
onCommitted(committedIndex);
Expand All @@ -196,7 +198,7 @@ private void doCommitted(final long committedIndex) {
this.lastAppliedTerm = dLedgerEntry.getTerm();
}
// Take snapshot
snapshotManager.saveSnapshot(dLedgerEntry);
snapshotManager.ifPresent(x -> x.saveSnapshot(dLedgerEntry));
// Check response timeout.
if (iter.getCompleteAckNums() == 0) {
if (this.entryPusher != null) {
Expand Down Expand Up @@ -300,11 +302,11 @@ public long getLastAppliedTerm() {
}

public void registerSnapshotManager(SnapshotManager snapshotManager) {
this.snapshotManager = snapshotManager;
this.snapshotManager = Optional.of(snapshotManager);
}

public SnapshotManager getSnapshotManager() {
return this.snapshotManager;
return this.snapshotManager.orElse(null);
}

public DLedgerStore getdLedgerStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testSingleServerInMemory() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:11001";
DLedgerServer dLedgerServer = launchServerWithStateMachine(group, peers, selfId, selfId, DLedgerConfig.MEMORY,
DLedgerServer dLedgerServer = launchServerWithStateMachineEnableSnapshot(group, peers, selfId, selfId, DLedgerConfig.MEMORY,
100000, 102400, new RegisterStateMachine());
dLedgerServer.registerUserDefineProcessors(Collections.singletonList(new RegisterReadProcessor(dLedgerServer)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St
return dLedgerServer;
}

protected synchronized DLedgerServer launchServer(String group, String peers, String selfId, String preferredLeaderId) {
protected synchronized DLedgerServer launchServer(String group, String peers, String selfId,
String preferredLeaderId) {
DLedgerConfig config = new DLedgerConfig();
config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
config.group(group).selfId(selfId).peers(peers);
Expand Down Expand Up @@ -73,12 +74,27 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St
return dLedgerServer;
}

protected synchronized DLedgerServer launchServerWithStateMachine(String group, String peers, String selfId, String leaderId,
String storeType, int snapshotThreshold, int mappedFileSizeForEntryData, StateMachine stateMachine) {
protected DLedgerServer launchServerWithStateMachineDisableSnapshot(String group, String peers,
String selfIf, String leaderId, String storeType, int mappedFileSizeForEntryData, StateMachine stateMachine) {
return this.launchServerWithStateMachine(group, peers, selfIf, leaderId, storeType, false, 0,
mappedFileSizeForEntryData, stateMachine);
}

protected DLedgerServer launchServerWithStateMachineEnableSnapshot(String group, String peers,
String selfId, String leaderId, String storeType, int snapshotThreshold, int mappedFileSizeForEntryData,
StateMachine stateMachine) {
return this.launchServerWithStateMachine(group, peers, selfId, leaderId, storeType, true, snapshotThreshold,
mappedFileSizeForEntryData, stateMachine);
}

protected synchronized DLedgerServer launchServerWithStateMachine(String group, String peers,
String selfId, String leaderId, String storeType, boolean enableSnapshot, int snapshotThreshold, int mappedFileSizeForEntryData,
StateMachine stateMachine) {
DLedgerConfig config = new DLedgerConfig();
config.group(group).selfId(selfId).peers(peers);
config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
config.setStoreType(storeType);
config.setEnableSnapshot(enableSnapshot);
config.setSnapshotThreshold(snapshotThreshold);
config.setMappedFileSizeForEntryData(mappedFileSizeForEntryData);
config.setEnableLeaderElector(false);
Expand All @@ -100,8 +116,8 @@ protected synchronized DLedgerServer launchServerWithStateMachine(String group,
return dLedgerServer;
}

protected synchronized DLedgerServer launchServerEnableBatchPush(String group, String peers, String selfId, String leaderId,
String storeType) {
protected synchronized DLedgerServer launchServerEnableBatchPush(String group, String peers, String selfId,
String leaderId, String storeType) {
DLedgerConfig config = new DLedgerConfig();
config.group(group).selfId(selfId).peers(peers);
config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public void testSaveAndLoadSnapshot() throws InterruptedException {
// Launch server
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
final List<DLedgerServer> serverList = new ArrayList<DLedgerServer>() {
{
add(dLedgerServer0);
Expand Down Expand Up @@ -77,9 +77,9 @@ public void testSaveAndLoadSnapshot() throws InterruptedException {
dLedgerServer2.shutdown();
serverList.clear();
// Restart server and apply snapshot
DLedgerServer newDLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
serverList.add(newDLedgerServer0);
serverList.add(newDLedgerServer1);
serverList.add(newDLedgerServer2);
Expand All @@ -99,7 +99,7 @@ public void testSnapshotReservedNum() throws InterruptedException {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = String.format("%s-localhost:%d", selfId, nextPort());
DLedgerServer server = launchServerWithStateMachine(group, peers, selfId, "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer server = launchServerWithStateMachineEnableSnapshot(group, peers, selfId, "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());

DLedgerClient dLedgerClient = launchClient(group, peers);
for (int i = 0; i < 120; i++) {
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testLoadErrorSnapshot() throws Exception {
IOUtils.string2File(JSON.toJSONString(snapshotMeta), snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_META_FILE);
IOUtils.string2File("80", snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE);

DLedgerServer server = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer server = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 10 * 1024 * 1024, new MockStateMachine());
Thread.sleep(1000);

StateMachineCaller caller = server.getFsmCaller();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testOnCommittedAndOnSnapshotSave() throws Exception {
String leaderId = "n0";
String peers = String.format("%s-localhost:%d", selfId, nextPort());

final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId);
final DLedgerServer dLedgerServer = createDLedgerServerInStateMachineMode(group, peers, selfId, leaderId);
final Pair<StateMachineCaller, MockStateMachine> result = mockCaller(dLedgerServer);
updateFileStore((DLedgerMmapFileStore) dLedgerServer.getDLedgerStore(), 10);
final StateMachineCaller caller = result.getKey();
Expand Down Expand Up @@ -88,7 +88,7 @@ public void testOnSnapshotLoad() throws Exception {
String leaderId = "n0";
String peers = String.format("%s-localhost:%d", selfId, nextPort());

final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId);
final DLedgerServer dLedgerServer = createDLedgerServerInStateMachineMode(group, peers, selfId, leaderId);
final Pair<StateMachineCaller, MockStateMachine> result = mockCaller(dLedgerServer);
final StateMachineCaller caller = result.getKey();
final MockStateMachine fsm = result.getValue();
Expand Down Expand Up @@ -124,7 +124,7 @@ public void doCallBack(SnapshotStatus status) {
caller.shutdown();
}

private DLedgerServer createDLedgerServer(String group, String peers, String selfId, String leaderId) {
private DLedgerServer createDLedgerServerInStateMachineMode(String group, String peers, String selfId, String leaderId) {
this.config = new DLedgerConfig();
this.config.group(group).selfId(selfId).peers(peers);
this.config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
Expand All @@ -134,6 +134,7 @@ private DLedgerServer createDLedgerServer(String group, String peers, String sel
this.config.setEnableLeaderElector(false);
this.config.setEnableDiskForceClean(false);
this.config.setDiskSpaceRatioToForceClean(0.90f);
this.config.setEnableSnapshot(true);
DLedgerServer dLedgerServer = new DLedgerServer(this.config);
MemberState memberState = dLedgerServer.getMemberState();
memberState.setCurrTermForTest(0);
Expand Down Expand Up @@ -176,9 +177,9 @@ private void updateFileStore(DLedgerMmapFileStore fileStore, int entryNum) {
public void testServerWithStateMachine() throws InterruptedException {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
final List<DLedgerServer> serverList = new ArrayList<DLedgerServer>() {
{
add(dLedgerServer0);
Expand Down

0 comments on commit 339149c

Please sign in to comment.