From a3268544636ee120dd6cbed1ba429aafdca2909a Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Sun, 21 Jul 2024 22:02:34 +0800 Subject: [PATCH] [FLINK-35872][table] Fix the incorrect partition generation for materialized table period refresh in Full Mode This closes #25108 (cherry picked from commit af7d2b3ab0e0ecc4157005704bebac9c767f2e1a) --- .../MaterializedTableManager.java | 14 +- .../MaterializedTableStatementITCase.java | 11 +- .../MaterializedTableManagerTest.java | 405 ++++++++++++++++-- .../flink/table/utils/DateTimeUtils.java | 15 +- 4 files changed, 388 insertions(+), 57 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index aa0877265a05b..2398e72172a81 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -102,8 +103,9 @@ import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig; import static org.apache.flink.table.gateway.service.utils.Constants.CLUSTER_INFO; import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID; -import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampString; +import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampStringWithOffset; import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron; +import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration; /** Manager is responsible for execute the {@link MaterializedTableOperation}. */ @Internal @@ -615,6 +617,7 @@ public ResultFetcher refreshMaterializedTable( isPeriodic ? getPeriodRefreshPartition( scheduleTime, + materializedTable.getDefinitionFreshness(), materializedTableIdentifier, materializedTable.getOptions(), operationExecutor @@ -686,13 +689,14 @@ public ResultFetcher refreshMaterializedTable( @VisibleForTesting static Map getPeriodRefreshPartition( String scheduleTime, + IntervalFreshness freshness, ObjectIdentifier materializedTableIdentifier, Map materializedTableOptions, ZoneId localZoneId) { if (scheduleTime == null) { throw new ValidationException( String.format( - "Scheduler time not properly set for periodic refresh of materialized table %s.", + "The scheduler time must not be null during the periodic refresh of the materialized table %s.", materializedTableIdentifier)); } @@ -707,12 +711,14 @@ static Map getPeriodRefreshPartition( PARTITION_FIELDS.length() + 1, partKey.length() - (DATE_FORMATTER.length() + 1)); String partFieldFormatter = materializedTableOptions.get(partKey); + String partFiledValue = - formatTimestampString( + formatTimestampStringWithOffset( scheduleTime, SCHEDULE_TIME_DATE_FORMATTER_DEFAULT, partFieldFormatter, - TimeZone.getTimeZone(localZoneId)); + TimeZone.getTimeZone(localZoneId), + -convertFreshnessToDuration(freshness).toMillis()); if (partFiledValue == null) { throw new SqlExecutionException( String.format( diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index ef67d8032465f..32ccbb95a7ac4 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -1392,22 +1392,21 @@ void testPeriodicRefreshMaterializedTableWithoutPartitionOptions() throws Except @Test void testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception { List data = new ArrayList<>(); - data.add(Row.of(1L, 1L, 1L, "2024-01-01")); - data.add(Row.of(2L, 2L, 2L, "2024-01-02")); // create materialized table with partition formatter createAndVerifyCreateMaterializedTableWithData( "my_materialized_table", data, Collections.singletonMap("ds", "yyyy-MM-dd"), - RefreshMode.CONTINUOUS); + RefreshMode.FULL); ObjectIdentifier materializedTableIdentifier = ObjectIdentifier.of( fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table"); // add more data to all data list - data.add(Row.of(3L, 3L, 3L, "2024-01-01")); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); data.add(Row.of(4L, 4L, 4L, "2024-01-02")); // refresh the materialized table with period schedule @@ -1417,7 +1416,7 @@ void testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception sessionHandle, materializedTableIdentifier.asSerializableString(), true, - "2024-01-02 00:00:00", + "2024-01-03 00:00:00", Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); @@ -1485,7 +1484,7 @@ void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E .isInstanceOf(ValidationException.class) .hasMessage( String.format( - "Scheduler time not properly set for periodic refresh of materialized table %s.", + "The scheduler time must not be null during the periodic refresh of the materialized table %s.", ObjectIdentifier.of( fileSystemCatalogName, TEST_DEFAULT_DATABASE, diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java index 083b4808c3021..082766cd29806 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java @@ -18,17 +18,21 @@ package org.apache.flink.table.gateway.service.materializedtable; -import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.annotation.Nullable; import java.time.ZoneId; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -99,56 +103,365 @@ void testGenerateInsertStatementWithDynamicOptions() { assertThat(actualStatement).isEqualTo(expectedStatement); } - @Test - void testGetPeriodRefreshPartition() { - String schedulerTime = "2024-01-01 00:00:00"; - Map tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); - + @ParameterizedTest(name = "{index}: {0}") + @MethodSource("testData") + void testGetPeriodRefreshPartition(TestSpec testSpec) { ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); - Map actualRefreshPartition = - MaterializedTableManager.getPeriodRefreshPartition( - schedulerTime, objectIdentifier, tableOptions, ZoneId.systemDefault()); - Map expectedRefreshPartition = new HashMap<>(); - expectedRefreshPartition.put("day", "2024-01-01"); - expectedRefreshPartition.put("hour", "00"); + if (testSpec.errorMessage == null) { + Map actualRefreshPartition = + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault()); - assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition); + assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition); + } else { + assertThatThrownBy( + () -> + MaterializedTableManager.getPeriodRefreshPartition( + testSpec.schedulerTime, + testSpec.freshness, + objectIdentifier, + testSpec.tableOptions, + ZoneId.systemDefault())) + .hasMessage(testSpec.errorMessage); + } } - @Test - void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() { - // scheduler time is null - Map tableOptions = new HashMap<>(); - tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd"); - tableOptions.put("partition.fields.hour.date-formatter", "HH"); + static Stream testData() { + return Stream.of( + // The interval of freshness match the partition specified by the 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-02 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-01") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-02 01:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-02") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "22"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "20"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("8")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "16"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "12"), + TestSpec.create() + .schedulerTime("2024-01-01 12:00:00") + .freshness(IntervalFreshness.ofHour("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2024-01-01") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "59"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "58"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "56"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("5")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "55"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("6")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "54"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("10")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "50"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("12")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "48"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("15")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "45"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("30")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "30"), + TestSpec.create() + .schedulerTime("2024-01-01 00:30:00") + .freshness(IntervalFreshness.ofMinute("30")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2024-01-01") + .expectedRefreshPartition("hour", "00") + .expectedRefreshPartition("minute", "00"), - ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog", "database", "table"); + // The interval of freshness is larger than the partition specified by the + // 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "00") + .expectedRefreshPartition("minute", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23") + .expectedRefreshPartition("minute", "00"), + TestSpec.create() + .schedulerTime("2024-01-01 01:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .tableOptions("partition.fields.minute.date-formatter", "mm") + .expectedRefreshPartition("day", "2024-01-01") + .expectedRefreshPartition("hour", "00") + .expectedRefreshPartition("minute", "00"), + // The interval of freshness is less than the partition specified by the + // 'date-formatter'. + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 01:00:00") + .freshness(IntervalFreshness.ofHour("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 02:00:00") + .freshness(IntervalFreshness.ofHour("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 04:00:00") + .freshness(IntervalFreshness.ofHour("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("2")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("4")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("15")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .expectedRefreshPartition("day", "2023-12-31") + .expectedRefreshPartition("hour", "23"), + TestSpec.create() + .schedulerTime("2024-01-01 00:00:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2023-12-31"), + TestSpec.create() + .schedulerTime("2024-01-01 00:01:00") + .freshness(IntervalFreshness.ofMinute("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .expectedRefreshPartition("day", "2024-01-01"), + + // Invalid test case. + TestSpec.create() + .schedulerTime(null) + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .errorMessage( + "The scheduler time must not be null during the periodic refresh of the materialized table `catalog`.`database`.`table`."), + TestSpec.create() + .schedulerTime("2024-01-01") + .freshness(IntervalFreshness.ofDay("1")) + .tableOptions("partition.fields.day.date-formatter", "yyyy-MM-dd") + .tableOptions("partition.fields.hour.date-formatter", "HH") + .errorMessage( + "Failed to parse a valid partition value for the field 'day' in materialized table `catalog`.`database`.`table` using the scheduler time '2024-01-01' based on the date format 'yyyy-MM-dd HH:mm:ss'.")); + } + + private static class TestSpec { + private String schedulerTime; + private IntervalFreshness freshness; + private final Map tableOptions; + private final Map expectedRefreshPartition; + + private @Nullable String errorMessage; + + private TestSpec() { + this.tableOptions = new HashMap<>(); + this.expectedRefreshPartition = new HashMap<>(); + } + + public static TestSpec create() { + return new TestSpec(); + } + + public TestSpec schedulerTime(String schedulerTime) { + this.schedulerTime = schedulerTime; + return this; + } + + public TestSpec freshness(IntervalFreshness freshness) { + this.freshness = freshness; + return this; + } + + public TestSpec tableOptions(String key, String value) { + this.tableOptions.put(key, value); + return this; + } + + public TestSpec expectedRefreshPartition(String key, String value) { + this.expectedRefreshPartition.put(key, value); + return this; + } + + public TestSpec errorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } - assertThatThrownBy( - () -> - MaterializedTableManager.getPeriodRefreshPartition( - null, - objectIdentifier, - tableOptions, - ZoneId.systemDefault())) - .isInstanceOf(ValidationException.class) - .hasMessage( - "Scheduler time not properly set for periodic refresh of materialized table `catalog`.`database`.`table`."); - - // scheduler time is invalid - String invalidSchedulerTime = "2024-01-01"; - assertThatThrownBy( - () -> - MaterializedTableManager.getPeriodRefreshPartition( - invalidSchedulerTime, - objectIdentifier, - tableOptions, - ZoneId.systemDefault())) - .isInstanceOf(SqlExecutionException.class) - .hasMessage( - "Failed to parse a valid partition value for the field 'day' in materialized table `catalog`.`database`.`table` using the scheduler time '2024-01-01' based on the date format 'yyyy-MM-dd HH:mm:ss'."); + @Override + public String toString() { + return "TestSpec{" + + "schedulerTime=" + + schedulerTime + + ", freshness=" + + freshness + + ", tableOptions=" + + tableOptions + + ", expectedRefreshPartition=" + + expectedRefreshPartition + + '}'; + } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java index dfb992bacb7de..4c582010da236 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java @@ -735,12 +735,23 @@ public static String formatTimestampMillis(long ts, String format, TimeZone tz) public static String formatTimestampString( String dateStr, String fromFormat, String toFormat, TimeZone tz) { + return formatTimestampStringWithOffset(dateStr, fromFormat, toFormat, tz, 0); + } + + public static String formatTimestampStringWithOffset( + String dateStr, String fromFormat, String toFormat, TimeZone tz, long offsetMills) { SimpleDateFormat fromFormatter = FORMATTER_CACHE.get(fromFormat); fromFormatter.setTimeZone(tz); SimpleDateFormat toFormatter = FORMATTER_CACHE.get(toFormat); toFormatter.setTimeZone(tz); try { - return toFormatter.format(fromFormatter.parse(dateStr)); + Date date = fromFormatter.parse(dateStr); + + if (offsetMills != 0) { + date = new Date(date.getTime() + offsetMills); + } + + return toFormatter.format(date); } catch (ParseException e) { LOG.error( "Exception when formatting: '" @@ -749,6 +760,8 @@ public static String formatTimestampString( + fromFormat + "' to: '" + toFormat + + "' with offsetMills: '" + + offsetMills + "'", e); return null;