Skip to content

Commit

Permalink
[FLINK-35872][table] Fix the incorrect partition generation for mater…
Browse files Browse the repository at this point in the history
…ialized table period refresh in Full Mode

This closes #25108

(cherry picked from commit af7d2b3)
  • Loading branch information
hackergin authored and lsyldliu committed Jul 22, 2024
1 parent eaeface commit a326854
Show file tree
Hide file tree
Showing 4 changed files with 388 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
Expand Down Expand Up @@ -102,8 +103,9 @@
import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
import static org.apache.flink.table.gateway.service.utils.Constants.CLUSTER_INFO;
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampString;
import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampStringWithOffset;
import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration;

/** Manager is responsible for execute the {@link MaterializedTableOperation}. */
@Internal
Expand Down Expand Up @@ -615,6 +617,7 @@ public ResultFetcher refreshMaterializedTable(
isPeriodic
? getPeriodRefreshPartition(
scheduleTime,
materializedTable.getDefinitionFreshness(),
materializedTableIdentifier,
materializedTable.getOptions(),
operationExecutor
Expand Down Expand Up @@ -686,13 +689,14 @@ public ResultFetcher refreshMaterializedTable(
@VisibleForTesting
static Map<String, String> getPeriodRefreshPartition(
String scheduleTime,
IntervalFreshness freshness,
ObjectIdentifier materializedTableIdentifier,
Map<String, String> materializedTableOptions,
ZoneId localZoneId) {
if (scheduleTime == null) {
throw new ValidationException(
String.format(
"Scheduler time not properly set for periodic refresh of materialized table %s.",
"The scheduler time must not be null during the periodic refresh of the materialized table %s.",
materializedTableIdentifier));
}

Expand All @@ -707,12 +711,14 @@ static Map<String, String> getPeriodRefreshPartition(
PARTITION_FIELDS.length() + 1,
partKey.length() - (DATE_FORMATTER.length() + 1));
String partFieldFormatter = materializedTableOptions.get(partKey);

String partFiledValue =
formatTimestampString(
formatTimestampStringWithOffset(
scheduleTime,
SCHEDULE_TIME_DATE_FORMATTER_DEFAULT,
partFieldFormatter,
TimeZone.getTimeZone(localZoneId));
TimeZone.getTimeZone(localZoneId),
-convertFreshnessToDuration(freshness).toMillis());
if (partFiledValue == null) {
throw new SqlExecutionException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,22 +1392,21 @@ void testPeriodicRefreshMaterializedTableWithoutPartitionOptions() throws Except
@Test
void testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception {
List<Row> data = new ArrayList<>();
data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
data.add(Row.of(2L, 2L, 2L, "2024-01-02"));

// create materialized table with partition formatter
createAndVerifyCreateMaterializedTableWithData(
"my_materialized_table",
data,
Collections.singletonMap("ds", "yyyy-MM-dd"),
RefreshMode.CONTINUOUS);
RefreshMode.FULL);

ObjectIdentifier materializedTableIdentifier =
ObjectIdentifier.of(
fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table");

// add more data to all data list
data.add(Row.of(3L, 3L, 3L, "2024-01-01"));
data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
data.add(Row.of(4L, 4L, 4L, "2024-01-02"));

// refresh the materialized table with period schedule
Expand All @@ -1417,7 +1416,7 @@ void testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception
sessionHandle,
materializedTableIdentifier.asSerializableString(),
true,
"2024-01-02 00:00:00",
"2024-01-03 00:00:00",
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());
Expand Down Expand Up @@ -1485,7 +1484,7 @@ void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Scheduler time not properly set for periodic refresh of materialized table %s.",
"The scheduler time must not be null during the periodic refresh of the materialized table %s.",
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
Expand Down
Loading

0 comments on commit a326854

Please sign in to comment.