Skip to content

Commit

Permalink
Normalize schema fingerprint for column permutations (#17044)
Browse files Browse the repository at this point in the history
Parent issue: #14989

It is possible for the order of columns to vary across segments especially during realtime ingestion.
Since, the schema fingerprint is sensitive to column order this leads to creation of a large number of segment schema in the metadata database for essentially the same set of columns.

This is wasteful, this patch fixes this problem by computing schema fingerprint on lexicographically sorted columns. This would result in creation of a single schema in the metadata database with the first observed column order for a given signature.
  • Loading branch information
findingrish committed Sep 18, 2024
1 parent 2f50138 commit 43d790f
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
Expand All @@ -30,11 +31,17 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Utility to generate fingerprint for an object.
* Utility to generate schema fingerprint which is used to ensure schema uniqueness in the metadata database.
* Note, that the generated fingerprint is independent of the column order.
*/
@LazySingleton
public class FingerprintGenerator
Expand All @@ -53,12 +60,20 @@ public FingerprintGenerator(ObjectMapper objectMapper)
* Generates fingerprint or hash string for an object using SHA-256 hash algorithm.
*/
@SuppressWarnings("UnstableApiUsage")
public String generateFingerprint(SchemaPayload schemaPayload, String dataSource, int version)
public String generateFingerprint(final SchemaPayload schemaPayload, final String dataSource, final int version)
{
// Sort the column names in lexicographic order
// The aggregator factories are column order independent since they are stored in a hashmap
// This ensures that all permutations of a given columns would result in the same fingerprint
// thus avoiding schema explosion in the metadata database
// Note that this signature is not persisted anywhere, it is only used for fingerprint computation
final RowSignature sortedSignature = getLexicographicallySortedSignature(schemaPayload.getRowSignature());
final SchemaPayload updatedPayload = new SchemaPayload(sortedSignature, schemaPayload.getAggregatorFactories());
try {

final Hasher hasher = Hashing.sha256().newHasher();

hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload));
hasher.putBytes(objectMapper.writeValueAsBytes(updatedPayload));
// add delimiter, inspired from org.apache.druid.metadata.PendingSegmentRecord.computeSequenceNamePrevIdSha1
hasher.putByte((byte) 0xff);

Expand All @@ -82,4 +97,21 @@ public String generateFingerprint(SchemaPayload schemaPayload, String dataSource
);
}
}

@VisibleForTesting
protected RowSignature getLexicographicallySortedSignature(final RowSignature rowSignature)
{
final List<String> columns = new ArrayList<>(rowSignature.getColumnNames());

Collections.sort(columns);

final RowSignature.Builder sortedSignature = RowSignature.builder();

for (String column : columns) {
ColumnType type = rowSignature.getColumnType(column).orElse(null);
sortedSignature.add(column, type);
}

return sortedSignature.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.metadata;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -48,6 +49,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -281,6 +283,112 @@ public void testAnnounceHistoricalSegments() throws IOException
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
}

@Test
public void testSchemaPermutation() throws JsonProcessingException
{
Set<DataSegment> segments = new HashSet<>();
SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
// Store the first observed column order for each segment for verification purpose
Map<String, Pair<SchemaPayload, Integer>> segmentIdSchemaMap = new HashMap<>();

RowSignature originalOrder =
RowSignature.builder()
.add("d7", ColumnType.LONG_ARRAY)
.add("b1", ColumnType.FLOAT)
.add("a5", ColumnType.DOUBLE)
.build();

// column permutations
List<List<String>> permutations = Arrays.asList(
Arrays.asList("d7", "a5", "b1"),
Arrays.asList("a5", "b1", "d7"),
Arrays.asList("a5", "d7", "b1"),
Arrays.asList("b1", "d7", "a5"),
Arrays.asList("b1", "a5", "d7"),
Arrays.asList("d7", "a5", "b1")
);

boolean first = true;

Random random = ThreadLocalRandom.current();
Random permutationRandom = ThreadLocalRandom.current();

for (int i = 0; i < 105; i++) {
DataSegment segment = new DataSegment(
"fooDataSource",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"version",
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(i),
9,
100
);
segments.add(segment);

int randomNum = random.nextInt();

RowSignature rowSignature;

if (first) {
rowSignature = originalOrder;
} else {
RowSignature.Builder builder = RowSignature.builder();
List<String> columns = permutations.get(permutationRandom.nextInt(permutations.size()));

for (String column : columns) {
builder.add(column, originalOrder.getColumnType(column).get());
}

rowSignature = builder.build();
}

SchemaPayload schemaPayload = new SchemaPayload(rowSignature);
segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(originalOrder), randomNum));
segmentSchemaMapping.addSchema(
segment.getId(),
new SchemaPayloadPlus(schemaPayload, (long) randomNum),
fingerprintGenerator.generateFingerprint(
schemaPayload,
segment.getDataSource(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
)
);

if (first) {
coordinator.commitSegments(segments, segmentSchemaMapping);
first = false;
}
}

coordinator.commitSegments(segments, segmentSchemaMapping);
for (DataSegment segment : segments) {
Assert.assertArrayEquals(
mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
segment.getId().toString()
)
);
}

List<String> segmentIds = segments.stream()
.map(segment -> segment.getId().toString())
.sorted(Comparator.naturalOrder())
.collect(Collectors.toList());

Assert.assertEquals(segmentIds, retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()));

// Should not update dataSource metadata.
Assert.assertEquals(0, metadataUpdateCounter.get());

// verify that only a single schema is created
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
}

@Test
public void testAnnounceHistoricalSegments_schemaExists() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.TestHelper;
Expand All @@ -30,7 +31,9 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FingerprintGeneratorTest
Expand All @@ -45,13 +48,20 @@ public class FingerprintGeneratorTest
@Test
public void testGenerateFingerprint_precalculatedHash()
{
RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build();
RowSignature rowSignature =
RowSignature.builder()
.add("c1", ColumnType.LONG)
.add("c0", ColumnType.STRING)
.add("c2", ColumnType.FLOAT)
.add("c3", ColumnType.DOUBLE)
.build();
Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null));
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c1", null));
aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true));

SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap);

String expected = "DEE5E8F59833102F0FA5B10F8B8884EA15220D1D2A5F6097A93D8309132E1039";
String expected = "82E774457D26D0B8D481B6C39872070B25EA3C72C6EFC107B346FA42641740E1";
Assert.assertEquals(expected, fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0));
}

Expand All @@ -60,25 +70,38 @@ public void testGenerateFingerprint_columnPermutation()
{
RowSignature rowSignature =
RowSignature.builder()
.add("c1", ColumnType.FLOAT)
.add("c2", ColumnType.LONG)
.add("c1", ColumnType.FLOAT)
.add("c3", ColumnType.DOUBLE)
.add("c0", ColumnType.STRING)
.build();

Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null));
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c2", null));
aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true));

SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap);

RowSignature rowSignaturePermutation =
RowSignature.builder()
.add("c2", ColumnType.LONG)
.add("c0", ColumnType.STRING)
.add("c3", ColumnType.DOUBLE)
.add("c1", ColumnType.FLOAT)
.build();

SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMap);
Assert.assertNotEquals(
Map<String, AggregatorFactory> aggregatorFactoryMapForPermutation = new HashMap<>();
aggregatorFactoryMapForPermutation.put(
"stringAny",
new StringAnyAggregatorFactory("stringAny", "c0", 1024, true)
);
aggregatorFactoryMapForPermutation.put(
"longFirst",
new LongFirstAggregatorFactory("longFirst", "c2", null)
);

SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMapForPermutation);
Assert.assertEquals(
fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0),
fingerprintGenerator.generateFingerprint(schemaPayloadNew, "ds", 0)
);
Expand Down Expand Up @@ -125,4 +148,29 @@ public void testGenerateFingerprint_differentVersion()
fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 1)
);
}

@Test
public void testRowSignatureIsSorted()
{
RowSignature rowSignature =
RowSignature.builder()
.add("c5", ColumnType.STRING)
.add("c1", ColumnType.FLOAT)
.add("b2", ColumnType.LONG)
.add("d3", ColumnType.DOUBLE)
.add("a1", ColumnType.STRING)
.build();

RowSignature sortedSignature = fingerprintGenerator.getLexicographicallySortedSignature(rowSignature);

Assert.assertNotEquals(rowSignature, sortedSignature);

List<String> columnNames = sortedSignature.getColumnNames();
List<String> sortedOrder = Arrays.asList("a1", "b2", "c1", "c5", "d3");
Assert.assertEquals(sortedOrder, columnNames);

for (String column : sortedOrder) {
Assert.assertEquals(sortedSignature.getColumnType(column), rowSignature.getColumnType(column));
}
}
}

0 comments on commit 43d790f

Please sign in to comment.