Skip to content

Commit

Permalink
Revert "Skip refresh for unused segments in metadata cache (#16990)"
Browse files Browse the repository at this point in the history
This reverts commit b0e07bf.
  • Loading branch information
cecemei committed Sep 14, 2024
1 parent 2e1adcc commit 028cf80
Showing 1 changed file with 0 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2443,102 +2443,6 @@ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToR
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
}

@Test
public void testUnusedSegmentIsNotRefreshed() throws InterruptedException, IOException
{
String dataSource = "xyz";
CountDownLatch latch = new CountDownLatch(1);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
) {
@Override
public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToRebuild)
throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
latch.countDown();
}
};

List<DataSegment> segments = ImmutableList.of(
newSegment(dataSource, 1),
newSegment(dataSource, 2),
newSegment(dataSource, 3)
);

final DruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);

Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();

ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));
segmentStatsMap.put(segments.get(1).getId(), new SegmentMetadata(20L, "fp"));
segmentStatsMap.put(segments.get(2).getId(), new SegmentMetadata(20L, "fp"));

ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new ImmutableMap.Builder<>();
schemaPayloadMap.put("fp", new SchemaPayload(RowSignature.builder().add("c1", ColumnType.DOUBLE).build()));
segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

schema.addSegment(historicalServerMetadata, segments.get(0));
schema.addSegment(historicalServerMetadata, segments.get(1));
schema.addSegment(historicalServerMetadata, segments.get(2));

serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
serverView.addSegment(segments.get(1), ServerType.HISTORICAL);
serverView.addSegment(segments.get(2), ServerType.HISTORICAL);

schema.onLeaderStart();
schema.awaitInitialization();

Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));

// make segment3 unused
segmentStatsMap = new ImmutableMap.Builder<>();
segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp"));

segmentSchemaCache.updateFinalizedSegmentSchema(
new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build())
);

Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
segmentMap.put(segments.get(0).getId(), segments.get(0));
segmentMap.put(segments.get(1).getId(), segments.get(1));

ImmutableDruidDataSource druidDataSource =
new ImmutableDruidDataSource(
"xyz",
Collections.emptyMap(),
segmentMap
);

Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(ArgumentMatchers.anyString()))
.thenReturn(druidDataSource);

Set<SegmentId> segmentsToRefresh = segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
segmentsToRefresh.remove(segments.get(1).getId());
segmentsToRefresh.remove(segments.get(2).getId());

schema.refresh(segmentsToRefresh, Sets.newHashSet(dataSource));

Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
Expand Down

0 comments on commit 028cf80

Please sign in to comment.