Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 15154 Added createSnapshot method to State API #15543

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hedera-node/hedera-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ testModuleInfo {
requires("com.hedera.node.app.spi.test.fixtures")
requires("com.hedera.node.config.test.fixtures")
requires("com.swirlds.config.extensions.test.fixtures")
requires("com.swirlds.common.test.fixtures")
requires("com.swirlds.platform.core.test.fixtures")
requires("com.swirlds.state.api.test.fixtures")
requires("com.swirlds.base.test.fixtures")
requires("headlong")
requires("org.assertj.core")
requires("org.bouncycastle.provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,40 @@
package com.hedera.node.app.state.merkle;

import static com.hedera.node.app.fixtures.AppTestBase.DEFAULT_CONFIG;
import static com.swirlds.platform.state.snapshot.SignedStateFileReader.readStateFileData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.hedera.node.app.ids.WritableEntityIdStore;
import com.hedera.node.app.services.MigrationStateChanges;
import com.hedera.node.app.version.ServicesSoftwareVersion;
import com.hedera.node.config.data.HederaConfig;
import com.swirlds.base.test.fixtures.time.FakeTime;
import com.swirlds.common.config.StateCommonConfig_;
import com.swirlds.common.constructable.ClassConstructorPair;
import com.swirlds.common.constructable.ConstructableRegistryException;
import com.swirlds.common.constructable.RuntimeConstructable;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.crypto.CryptographyFactory;
import com.swirlds.common.crypto.config.CryptoConfig;
import com.swirlds.common.io.utility.LegacyTemporaryFileBuilder;
import com.swirlds.common.merkle.crypto.MerkleCryptographyFactory;
import com.swirlds.common.test.fixtures.platform.TestPlatformContextBuilder;
import com.swirlds.config.api.Configuration;
import com.swirlds.config.extensions.sources.SimpleConfigSource;
import com.swirlds.config.extensions.test.fixtures.TestConfigBuilder;
import com.swirlds.metrics.api.Metrics;
import com.swirlds.platform.config.StateConfig_;
import com.swirlds.platform.state.MerkleStateLifecycles;
import com.swirlds.platform.state.MerkleStateRoot;
import com.swirlds.platform.state.signed.SignedState;
import com.swirlds.platform.state.snapshot.SignedStateFileReader;
import com.swirlds.platform.state.snapshot.SignedStateFileUtils;
import com.swirlds.platform.system.InitTrigger;
import com.swirlds.platform.system.Platform;
import com.swirlds.platform.test.fixtures.state.MerkleTestBase;
import com.swirlds.platform.test.fixtures.state.RandomSignedStateGenerator;
import com.swirlds.platform.test.fixtures.state.TestSchema;
import com.swirlds.state.merkle.disk.OnDiskReadableKVState;
import com.swirlds.state.merkle.disk.OnDiskWritableKVState;
Expand Down Expand Up @@ -62,6 +78,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
Expand All @@ -79,6 +96,9 @@ class SerializationTest extends MerkleTestBase {
@Mock
private MigrationStateChanges migrationStateChanges;

@TempDir
Path tempDir;

@BeforeEach
void setUp() throws IOException {
setupConstructableRegistry();
Expand All @@ -90,6 +110,7 @@ void setUp() throws IOException {
.withValue(VirtualMapConfig_.COPY_FLUSH_THRESHOLD, 1 + ""))
.withConfigDataType(VirtualMapConfig.class)
.withConfigDataType(HederaConfig.class)
.withConfigDataType(CryptoConfig.class)
.getOrCreateConfig();
this.networkInfo = mock(NetworkInfo.class);
}
Expand Down Expand Up @@ -176,7 +197,7 @@ private void forceFlush(ReadableKVState<?, ?> state) {
@ValueSource(booleans = {true, false})
void simpleReadAndWrite(boolean forceFlush) throws IOException, ConstructableRegistryException {
final var schemaV1 = createV1Schema();
final var originalTree = createMerkleHederaState(schemaV1);
final var originalTree = (MerkleStateRoot) createMerkleHederaState(schemaV1);

// When we serialize it to bytes and deserialize it back into a tree
MerkleStateRoot copy = originalTree.copy(); // make a copy to make VM flushable
Expand All @@ -192,11 +213,45 @@ void simpleReadAndWrite(boolean forceFlush) throws IOException, ConstructableReg
serializedBytes = writeTree(originalTree, dir);
}

final MerkleStateRoot loadedTree = loadeMerkleTree(schemaV1, serializedBytes);
final MerkleStateRoot loadedTree = loadedMerkleTree(schemaV1, serializedBytes);

assertTree(loadedTree);
}

@Test
void snapshot() throws IOException {
final var schemaV1 = createV1Schema();
final var originalTree = createMerkleHederaState(schemaV1);
final var configBuilder = new TestConfigBuilder()
.withValue(StateConfig_.SIGNED_STATE_DISK, 1)
.withValue(
StateCommonConfig_.SAVED_STATE_DIRECTORY,
tempDir.toFile().toString());
final var cryptography = CryptographyFactory.create();
final var merkleCryptography = MerkleCryptographyFactory.create(config, cryptography);
final PlatformContext context = TestPlatformContextBuilder.create()
.withMerkleCryptography(merkleCryptography)
.withConfiguration(configBuilder.getOrCreateConfig())
.withTime(new FakeTime())
.build();

Platform mockPlatform = mock(Platform.class);
when(mockPlatform.getContext()).thenReturn(context);
originalTree.init(mockPlatform, InitTrigger.RESTART, new ServicesSoftwareVersion(schemaV1.getVersion()));

// prepare the tree and create a snapshot
originalTree.copy();
originalTree.computeHash();
originalTree.createSnapshot(tempDir);

final SignedStateFileReader.StateFileData deserializedSignedState = readStateFileData(
tempDir.resolve(SignedStateFileUtils.SIGNED_STATE_FILE_NAME), SignedStateFileUtils::readState);

MerkleStateRoot state = (MerkleStateRoot) deserializedSignedState.state();
initServices(schemaV1, state);
assertTree(state);
}

/**
* This test scenario is trickier, and it's designed to reproduce <a href="https://github.com/hashgraph/hedera-services/issues/13335">#13335: OnDiskKeySerializer uses wrong classId for OnDiskKey.</a>
* This issue can be reproduced only if at first it gets flushed to disk, then it gets loaded back in, and this time it remains in cache.
Expand All @@ -205,7 +260,7 @@ void simpleReadAndWrite(boolean forceFlush) throws IOException, ConstructableReg
@Test
void dualReadAndWrite() throws IOException, ConstructableRegistryException {
final var schemaV1 = createV1Schema();
final var originalTree = createMerkleHederaState(schemaV1);
final var originalTree = (MerkleStateRoot) createMerkleHederaState(schemaV1);

MerkleStateRoot copy = originalTree.copy(); // make a copy to make VM flushable

Expand All @@ -214,7 +269,7 @@ void dualReadAndWrite() throws IOException, ConstructableRegistryException {
CRYPTO.digestTreeSync(copy);
final byte[] serializedBytes = writeTree(copy, dir);

MerkleStateRoot loadedTree = loadeMerkleTree(schemaV1, serializedBytes);
MerkleStateRoot loadedTree = loadedMerkleTree(schemaV1, serializedBytes);
((OnDiskReadableKVState) originalTree.getReadableStates(FIRST_SERVICE).get(ANIMAL_STATE_KEY)).reset();
populateVmCache(loadedTree);

Expand All @@ -226,19 +281,16 @@ void dualReadAndWrite() throws IOException, ConstructableRegistryException {
final byte[] serializedBytesWithCache = writeTree(loadedTree, dir);

// let's load it again and see if it works
MerkleStateRoot loadedTreeWithCache = loadeMerkleTree(schemaV1, serializedBytesWithCache);
MerkleStateRoot loadedTreeWithCache = loadedMerkleTree(schemaV1, serializedBytesWithCache);
((OnDiskReadableKVState)
loadedTreeWithCache.getReadableStates(FIRST_SERVICE).get(ANIMAL_STATE_KEY))
.reset();

assertTree(loadedTreeWithCache);
}

private MerkleStateRoot loadeMerkleTree(Schema schemaV1, byte[] serializedBytes)
private MerkleStateRoot loadedMerkleTree(Schema schemaV1, byte[] serializedBytes)
throws ConstructableRegistryException, IOException {
final var newRegistry =
new MerkleSchemaRegistry(registry, FIRST_SERVICE, DEFAULT_CONFIG, new SchemaApplications());
newRegistry.register(schemaV1);

// Register the MerkleStateRoot so, when found in serialized bytes, it will register with
// our migration callback, etc. (normally done by the Hedera main method)
Expand All @@ -248,6 +300,15 @@ private MerkleStateRoot loadeMerkleTree(Schema schemaV1, byte[] serializedBytes)
registry.registerConstructable(pair);

final MerkleStateRoot loadedTree = parseTree(serializedBytes, dir);
initServices(schemaV1, loadedTree);

return loadedTree;
}

private void initServices(Schema schemaV1, MerkleStateRoot loadedTree) {
final var newRegistry =
new MerkleSchemaRegistry(registry, FIRST_SERVICE, DEFAULT_CONFIG, new SchemaApplications());
newRegistry.register(schemaV1);
newRegistry.migrate(
loadedTree,
schemaV1.getVersion(),
Expand All @@ -259,12 +320,13 @@ private MerkleStateRoot loadeMerkleTree(Schema schemaV1, byte[] serializedBytes)
new HashMap<>(),
migrationStateChanges);
loadedTree.migrate(MerkleStateRoot.CURRENT_VERSION);

return loadedTree;
}

private MerkleStateRoot createMerkleHederaState(Schema schemaV1) {
final var originalTree = new MerkleStateRoot(lifecycles, version -> new ServicesSoftwareVersion(version, 0));
final SignedState randomState =
new RandomSignedStateGenerator().setRound(1).build();

final var originalTree = (MerkleStateRoot) randomState.getState();
final var originalRegistry =
new MerkleSchemaRegistry(registry, FIRST_SERVICE, DEFAULT_CONFIG, new SchemaApplications());
originalRegistry.register(schemaV1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
*/
public class SerializableDataInputStream extends AugmentedDataInputStream {

private static final int PROTOCOL_VERSION = SERIALIZATION_PROTOCOL_VERSION;
lpetrovic05 marked this conversation as resolved.
Show resolved Hide resolved
private static final Set<Integer> SUPPORTED_PROTOCOL_VERSIONS = Set.of(SERIALIZATION_PROTOCOL_VERSION);

/** A stream used to read PBJ objects */
private final ReadableSequentialData readableSequentialData;
Expand All @@ -70,16 +70,17 @@
}

/**
* Reads the protocol version written by {@link SerializableDataOutputStream#writeProtocolVersion()} and saves it
* internally. From this point on, it will use this version number to deserialize.
* Reads the protocol version written by {@link SerializableDataOutputStream#writeProtocolVersion()}
* From this point on, it will use this version number to deserialize.
*
* @throws IOException thrown if any IO problems occur
*/
public void readProtocolVersion() throws IOException {
public int readProtocolVersion() throws IOException {
final int protocolVersion = readInt();
if (protocolVersion != PROTOCOL_VERSION) {
throw new IOException("invalid protocol version " + protocolVersion);
if (!SUPPORTED_PROTOCOL_VERSIONS.contains(protocolVersion)) {
throw new IOException("Unsupported protocol version " + protocolVersion);

Check warning on line 81 in platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataInputStream.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataInputStream.java#L81

Added line #L81 was not covered by tests
}
return protocolVersion;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@
exports com.swirlds.common.crypto.internal to
com.swirlds.platform.core,
com.swirlds.common.test.fixtures,
com.swirlds.common.testing;
com.swirlds.common.testing,
com.swirlds.platform.core.test.fixtures;
exports com.swirlds.common.notification.internal to
com.swirlds.common.testing;
exports com.swirlds.common.crypto.engine to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.swirlds.common.merkle.MerkleInternal;
import com.swirlds.platform.system.SwirldState;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.nio.file.Path;

/**
* This interface represents the root node of the Merkle tree.
Expand Down Expand Up @@ -68,4 +69,9 @@ public interface MerkleRoot extends MerkleInternal {
/** {@inheritDoc} */
@NonNull
MerkleRoot copy();

/* Creates a snapshots for the state. The state has to be hashed and immutable before calling this method.
* @param targetPath The path to save the snapshot.
*/
void createSnapshot(final Path targetPath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.platform.state;

import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.metrics.RunningAverageMetric;
import com.swirlds.metrics.api.Metrics;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
* This class encapsulates metrics for the Merkle root snapshot.
*/
public class MerkleRootSnapshotMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class doc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

private static final RunningAverageMetric.Config WRITE_MERKLE_ROOT_TO_DISK_TIME_CONFIG =
new RunningAverageMetric.Config("platform", "writeMerkleRootToDisk")
.withDescription("average time it takes to write a Merkle tree to disk (in milliseconds)")
.withUnit("ms");

private final RunningAverageMetric writeMerkleRootToDiskTime;
/**
* Constructor.
*
* @param platformContext the platform context
*/
public MerkleRootSnapshotMetrics(@NonNull final PlatformContext platformContext) {
final Metrics metrics = platformContext.getMetrics();
writeMerkleRootToDiskTime = metrics.getOrCreate(WRITE_MERKLE_ROOT_TO_DISK_TIME_CONFIG);
}

/**
* Get a metric tracking the average time required to write a Merkle tree to disk.
*
* @return the metric tracking the average time required to write a Merkle tree to disk
*/
@NonNull
public RunningAverageMetric getWriteStateToDiskTimeMetric() {
return writeMerkleRootToDiskTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -93,6 +94,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -144,13 +146,20 @@

private final Function<SemanticVersion, SoftwareVersion> versionFactory;
private MerkleCryptography merkleCryptography;
private PlatformContext platformContext;
private Platform platform;

public Map<String, Map<String, StateMetadata<?, ?>>> getServices() {
return services;
}

private Metrics metrics;

/**
* Metrics for the snapshot creation process
*/
private MerkleRootSnapshotMetrics snapshotMetrics;

/**
* Maintains information about each service, and each state of each service, known by this
* instance. The key is the "service-name.state-key".
Expand Down Expand Up @@ -235,8 +244,11 @@
@NonNull final Platform platform,
@NonNull final InitTrigger trigger,
@Nullable final SoftwareVersion deserializedVersion) {
metrics = platform.getContext().getMetrics();
merkleCryptography = platform.getContext().getMerkleCryptography();
this.platform = platform;
platformContext = this.platform.getContext();
metrics = platformContext.getMetrics();
merkleCryptography = platformContext.getMerkleCryptography();
snapshotMetrics = new MerkleRootSnapshotMetrics(platformContext);

// If we are initialized for event stream recovery, we have to register an
// extra listener to make sure we call all the required Hedera lifecycles
Expand Down Expand Up @@ -1082,4 +1094,20 @@
Thread.currentThread().interrupt();
}
}

/**
* {@inheritDoc}
*/
@Override
public void createSnapshot(@NonNull Path targetPath) {
throwIfMutable();
throwIfDestroyed();

Check warning on line 1104 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/MerkleStateRoot.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/state/MerkleStateRoot.java#L1100-L1104

Added lines #L1100 - L1104 were not covered by tests
final long start = System.nanoTime();
MerkleTreeSnapshotWriter.createSnapshot(this, targetPath);
if (snapshotMetrics != null) {
snapshotMetrics
.getWriteStateToDiskTimeMetric()
.update(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}
}
Loading
Loading