Skip to content

Commit

Permalink
fix checksum issues
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi committed Sep 3, 2023
1 parent 7a6cd4e commit d3ba955
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
Expand All @@ -19,11 +21,13 @@
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;

Expand Down Expand Up @@ -244,4 +248,18 @@ public void close() throws IOException {
throw new IOException("Closure of some of the multi-part streams failed.");
}
}

/**
* Compute final checksum for IndexInput
* @param indexInput IndexInput with checksum in footer
* @param checksumBytesLength length of checksum bytes
* @return final computed checksum of entire indexInput
* @throws IOException
*/
public static long checksumOfChecksum(IndexInput indexInput, int checksumBytesLength) throws IOException {
long storedChecksum = CodecUtil.retrieveChecksum(indexInput);
CRC32 checksumOfChecksum = new CRC32();
checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum));
return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), checksumBytesLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
allUploadedIndexMetadata.values().stream().collect(Collectors.toList()),
false
);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), false);
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand All @@ -29,7 +28,6 @@
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.store.exception.ChecksumCombinationException;

Expand All @@ -47,9 +45,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;
import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;

/**
* A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store.
Expand Down Expand Up @@ -380,11 +377,8 @@ private void uploadBlob(

private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException {
try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) {
long storedChecksum = CodecUtil.retrieveChecksum(indexInput);
CRC32 checksumOfChecksum = new CRC32();
checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum));
try {
return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES);
return checksumOfChecksum(indexInput, SEGMENT_CHECKSUM_BYTES);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.CorruptStateException;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.snapshots.SnapshotInfo;

import java.io.IOException;
Expand All @@ -72,6 +73,8 @@
import java.util.Locale;
import java.util.Map;

import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;

/**
* Snapshot metadata file format used in v2.0 and above
*
Expand Down Expand Up @@ -196,7 +199,19 @@ public void writeAsync(
final String blobName = blobName(name);
final BytesReference bytes = serialize(obj, blobName, compressor);

IndexInput input = new ByteArrayIndexInput("", BytesReference.toBytes(bytes));
IndexInput input = new ByteArrayIndexInput("ChecksumBlobStoreFormat", BytesReference.toBytes(bytes));

long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(input.clone(), 8);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
+ "and calculated checksum of stored checksum in stream",
"ChecksumBlobStoreFormat",
e
);
}

RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
Expand All @@ -205,7 +220,7 @@ public void writeAsync(
true,
WritePriority.HIGH,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
CodecUtil.checksumEntireFile(input),
expectedChecksum,
true
);

Expand Down

0 comments on commit d3ba955

Please sign in to comment.