From d3ba955e3713a620adef6033dc6c5b51d5422fb9 Mon Sep 17 00:00:00 2001 From: bansvaru Date: Mon, 4 Sep 2023 00:15:50 +0530 Subject: [PATCH] fix checksum issues Signed-off-by: bansvaru --- .../transfer/RemoteTransferContainer.java | 18 ++++++++++++++++++ .../remote/RemoteClusterStateService.java | 6 +----- .../index/store/RemoteDirectory.java | 10 ++-------- .../blobstore/ChecksumBlobStoreFormat.java | 19 +++++++++++++++++-- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index c7cfef5c5ce3d..736feb7c732d9 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; @@ -19,11 +21,13 @@ import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream; import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.util.ByteUtils; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Objects; +import java.util.zip.CRC32; import com.jcraft.jzlib.JZlib; @@ -244,4 +248,18 @@ public void close() throws IOException { throw new IOException("Closure of some of the multi-part streams failed."); } } + + /** + * Compute final checksum for IndexInput + * @param indexInput IndexInput with checksum in footer + * @param checksumBytesLength length of checksum bytes + * @return final computed checksum of entire indexInput + * @throws IOException + */ + public static long checksumOfChecksum(IndexInput indexInput, int checksumBytesLength) throws IOException { + long storedChecksum = CodecUtil.retrieveChecksum(indexInput); + CRC32 checksumOfChecksum = new CRC32(); + checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); + return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), checksumBytesLength); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 0cd693d86e4f6..af5838098caf8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -220,11 +220,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { allUploadedIndexMetadata.remove(removedIndexName); } - final ClusterMetadataManifest manifest = uploadManifest( - clusterState, - allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), - false - ); + final ClusterMetadataManifest manifest = uploadManifest(clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), false); final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 04b5d7eb7c6bd..67b8ce55be353 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -29,7 +28,6 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; -import org.opensearch.common.util.ByteUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.index.store.exception.ChecksumCombinationException; @@ -47,9 +45,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import java.util.zip.CRC32; -import com.jcraft.jzlib.JZlib; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; /** * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. @@ -380,11 +377,8 @@ private void uploadBlob( private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { - long storedChecksum = CodecUtil.retrieveChecksum(indexInput); - CRC32 checksumOfChecksum = new CRC32(); - checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); try { - return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); + return checksumOfChecksum(indexInput, SEGMENT_CHECKSUM_BYTES); } catch (Exception e) { throw new ChecksumCombinationException( "Potentially corrupted file: Checksum combination failed while combining stored checksum " diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 28d01256ec3e3..1d7b507a40628 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -63,6 +63,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.gateway.CorruptStateException; +import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.snapshots.SnapshotInfo; import java.io.IOException; @@ -72,6 +73,8 @@ import java.util.Locale; import java.util.Map; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; + /** * Snapshot metadata file format used in v2.0 and above * @@ -196,7 +199,19 @@ public void writeAsync( final String blobName = blobName(name); final BytesReference bytes = serialize(obj, blobName, compressor); - IndexInput input = new ByteArrayIndexInput("", BytesReference.toBytes(bytes)); + IndexInput input = new ByteArrayIndexInput("ChecksumBlobStoreFormat", BytesReference.toBytes(bytes)); + + long expectedChecksum; + try { + expectedChecksum = checksumOfChecksum(input.clone(), 8); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum in stream", + "ChecksumBlobStoreFormat", + e + ); + } RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( blobName, @@ -205,7 +220,7 @@ public void writeAsync( true, WritePriority.HIGH, (size, position) -> new OffsetRangeIndexInputStream(input, size, position), - CodecUtil.checksumEntireFile(input), + expectedChecksum, true );