Skip to content

Commit

Permalink
Wrap uploadMetadata implementation with synchronized
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Aug 12, 2022
1 parent 02bc44d commit 2470f4a
Showing 1 changed file with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOE
logger.info("Reading latest Metadata file {}", latestMetadataFile.get());
segmentMetadataMap = readMetadataFile(latestMetadataFile.get());
} else {
logger.info("No metadata file found");
logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store");
}

return segmentMetadataMap;
Expand Down Expand Up @@ -191,7 +191,8 @@ public int compare(String first, String second) {
}
}

public static String getMetadataFilename(long primaryTerm, long generation, String uuid) {
// Visible for testing
static String getMetadataFilename(long primaryTerm, long generation, String uuid) {
return String.join(
SEPARATOR,
METADATA_PREFIX,
Expand All @@ -201,15 +202,18 @@ public static String getMetadataFilename(long primaryTerm, long generation, Stri
);
}

public static long getPrimaryTerm(String[] filenameTokens) {
// Visible for testing
static long getPrimaryTerm(String[] filenameTokens) {
return Long.parseLong(filenameTokens[1]);
}

public static long getGeneration(String[] filenameTokens) {
// Visible for testing
static long getGeneration(String[] filenameTokens) {
return Long.parseLong(filenameTokens[2], Character.MAX_RADIX);
}

public static String getUuid(String[] filenameTokens) {
// Visible for testing
static String getUuid(String[] filenameTokens) {
return filenameTokens[3];
}
}
Expand Down Expand Up @@ -320,21 +324,23 @@ public boolean containsFile(String localFilename, String checksum) {
*/
public void uploadMetadata(Collection<String> segmentFiles, Directory storeDirectory, long primaryTerm, long generation)
throws IOException {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix);
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix);
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
}
}
indexOutput.writeMapOfStrings(uploadedSegments);
indexOutput.close();
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
storeDirectory.deleteFile(metadataFilename);
}
indexOutput.writeMapOfStrings(uploadedSegments);
indexOutput.close();
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
storeDirectory.deleteFile(metadataFilename);
}

private String getChecksumOfLocalFile(Directory directory, String file) throws IOException {
Expand Down

0 comments on commit 2470f4a

Please sign in to comment.