Skip to content

Commit

Permalink
Parse selective stats from query stats and address other review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
agarwali committed Jul 3, 2024
1 parent 98bb9a6 commit 913fd9f
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum StatisticName
ROWS_DELETED("rowsDeleted"),
FILES_LOADED("filesLoaded"),
ROWS_WITH_ERRORS("rowsWithErrors"),
QUERY_OPERATOR_STATS("queryOperatorStats");
EXTERNAL_BYTES_SCANNED("externalBytesScanned");

String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public class LogicalPlanFactory
public static final String TABLE_ALIAS = "X";
public static final String MAX_OF_FIELD = "MAX";
public static final String MIN_OF_FIELD = "MIN";
public static final String QUERY_OPERATOR_STATS_QUERY_ID = "GET_QUERY_OPERATOR_STATS";
public static final String QUERY_ID = "{QUERY_ID}";

public static LogicalPlan getLogicalPlanForIsDatasetEmpty(Dataset dataset)
{
Expand Down Expand Up @@ -95,19 +93,7 @@ public static LogicalPlan getLogicalPlanForConstantStats(String stats, Long valu
.build();
}

public static LogicalPlan getLogicalPlanForQueryOperatorStats()
{
return LogicalPlan.builder()
.addOps(Selection.builder()
.addFields(All.INSTANCE)
.source(FunctionalDataset
.builder()
.name(QUERY_OPERATOR_STATS_QUERY_ID)
.addValue(StringValue.of(QUERY_ID))
.build())
.build())
.build();
}


public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ public enum FunctionName
TO_VARIANT,
TO_JSON,
CONVERT,
JSON_EXTRACT_PATH_TEXT,
STRUCT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public abstract class IngestorResultAbstract

public abstract String ingestionTimestampUTC();

public abstract Optional<String> queryId();

@Value.Default
public boolean previouslyProcessed()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public enum FunctionName
ARRAY_CONSTRUCT("ARRAY_CONSTRUCT"),
TO_JSON("TO_JSON"),
CONVERT("CONVERT"),
JSON_EXTRACT_PATH_TEXT("JSON_EXTRACT_PATH_TEXT"),
STRUCT("STRUCT");

private static final Map<String, FunctionName> BY_NAME = Arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,33 @@
import org.finos.legend.engine.persistence.components.relational.snowflake.optmizer.UpperCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.SnowflakeDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.SnowflakeJdbcPropertiesToLogicalDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.*;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.AlterVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.BatchEndTimestampVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.CastFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ClusterKeyVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.CopyVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.DatasetAdditionalPropertiesVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.DigestUdfVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.FieldVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.FunctionalDatasetVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.MetadataFileNameFieldVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.MetadataRowNumberFieldVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.SQLCreateVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.SchemaDefinitionVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ShowVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesDatasetReferenceVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesDatasetVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesFieldValueVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.StagedFilesSelectionVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.ToArrayFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.snowflake.sql.visitor.TryCastFunctionVisitor;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.sqldom.utils.SqlGenUtils;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.Transformer;
import org.finos.legend.engine.persistence.components.util.Capability;
import org.finos.legend.engine.persistence.components.util.QueryStatsLogicalPlanUtils;
import org.finos.legend.engine.persistence.components.util.PlaceholderValue;
import org.finos.legend.engine.persistence.components.util.ValidationCategory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -525,26 +545,13 @@ public IngestorResult performBulkLoad(Datasets datasets, Executor<SqlGen, Tabula

Map<StatisticName, Object> stats = new HashMap<>();

resultData.queryId().ifPresent(queryId ->
{
RelationalTransformer transformer = new RelationalTransformer(SnowflakeSink.get());
SqlPlan physicalPlanForQueryOperatorStats = transformer.generatePhysicalPlan(LogicalPlanFactory.getLogicalPlanForQueryOperatorStats());
HashMap<String, PlaceholderValue> queryIdPlaceHolder = new HashMap<>();
queryIdPlaceHolder.put("{QUERY_ID}", PlaceholderValue.builder().value(queryId).isSensitive(false).build());
List<TabularData> queryOperatorStats = executor.executePhysicalPlanAndGetResults(physicalPlanForQueryOperatorStats, queryIdPlaceHolder);
stats.put(StatisticName.QUERY_OPERATOR_STATS, queryOperatorStats.stream().map(TabularData::data).collect(Collectors.toList()));
});

stats.put(StatisticName.ROWS_INSERTED, totalRowsLoaded);
stats.put(StatisticName.ROWS_WITH_ERRORS, totalRowsWithError);
stats.put(StatisticName.FILES_LOADED, totalFilesLoaded);

IngestorResult.Builder resultBuilder = IngestorResult.builder()
.queryId(resultData.queryId())
.updatedDatasets(datasets)
.putAllStatisticByName(stats)
.ingestionTimestampUTC(placeHolderKeyValues.get(BATCH_START_TS_PATTERN).value())
.batchId(Optional.ofNullable(placeHolderKeyValues.containsKey(BATCH_ID_PATTERN) ? Integer.valueOf(placeHolderKeyValues.get(BATCH_ID_PATTERN).value()) : null));
resultData.queryId().ifPresent(queryId -> enhanceWithLoadStatistics(executor, stats, queryId));

IngestorResult.Builder resultBuilder = IngestorResult.builder().updatedDatasets(datasets).putAllStatisticByName(stats).ingestionTimestampUTC(placeHolderKeyValues.get(BATCH_START_TS_PATTERN).value()).batchId(Optional.ofNullable(placeHolderKeyValues.containsKey(BATCH_ID_PATTERN) ? Integer.valueOf(placeHolderKeyValues.get(BATCH_ID_PATTERN).value()) : null));
IngestorResult result;

if (dataFilePathsWithErrors.isEmpty())
Expand All @@ -565,6 +572,35 @@ public IngestorResult performBulkLoad(Datasets datasets, Executor<SqlGen, Tabula
return result;
}

private void enhanceWithLoadStatistics(Executor<SqlGen, TabularData, SqlPlan> executor, Map<StatisticName, Object> stats, String queryId)
{
try
{
RelationalTransformer transformer = new RelationalTransformer(SnowflakeSink.get());
SqlPlan physicalPlanForQueryOperatorStats = transformer.generatePhysicalPlan(QueryStatsLogicalPlanUtils.getLogicalPlanForQueryOperatorStats());
HashMap<String, PlaceholderValue> queryIdPlaceHolder = new HashMap<>();
queryIdPlaceHolder.put(QueryStatsLogicalPlanUtils.QUERY_ID_PARAMETER, PlaceholderValue.builder().value(queryId).isSensitive(false).build());
List<TabularData> queryOperatorStats = executor.executePhysicalPlanAndGetResults(physicalPlanForQueryOperatorStats, queryIdPlaceHolder);
if (!queryOperatorStats.isEmpty())
{
List<Map<String, Object>> queryOperatorStatsResults = queryOperatorStats.get(0).data();
queryOperatorStatsResults.forEach(queryStats -> {
switch ((String) queryStats.get(QueryStatsLogicalPlanUtils.OPERATOR_TYPE_ALIAS))
{
case QueryStatsLogicalPlanUtils.EXTERNAL_SCAN_STAGE:
stats.put(StatisticName.EXTERNAL_BYTES_SCANNED, queryStats.get(QueryStatsLogicalPlanUtils.EXTERNAL_BYTES_SCANNED_ALIAS));
case QueryStatsLogicalPlanUtils.INSERT_STAGE:
stats.put(StatisticName.INCOMING_RECORD_COUNT, queryStats.get(QueryStatsLogicalPlanUtils.INPUT_ROWS_ALIAS));
}
});
}
}
catch (Exception e)
{
LOGGER.error(String.format("Error extracting query stats for query id: [%s].", queryId), e);
}
}

private String getErrorMessage(Map<String, Object> row)
{
Map<String, Object> errorInfoMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.util;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FunctionalDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName;
import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue;

public class QueryStatsLogicalPlanUtils
{
public static final String QUERY_OPERATOR_STATS_FUNCTION = "GET_QUERY_OPERATOR_STATS";
public static final String QUERY_ID_PARAMETER = "{QUERY_ID}";
public static final String OPERATOR_TYPE = "OPERATOR_TYPE";
public static final String OPERATOR_STATISTICS = "OPERATOR_STATISTICS";

public static final String OPERATOR_TYPE_ALIAS = "operatorType";
public static final String EXTERNAL_BYTES_SCANNED_ALIAS = "externalBytesScanned";
public static final String INPUT_ROWS_ALIAS = "inputRows";

public static final String EXTERNAL_BYTES_SCANNED_PATH = "io.external_bytes_scanned";
public static final String INPUT_ROWS_PATH = "input_rows";

public static final String EXTERNAL_SCAN_STAGE = "ExternalScan";
public static final String INSERT_STAGE = "Insert";

private QueryStatsLogicalPlanUtils()
{
}

public static LogicalPlan getLogicalPlanForQueryOperatorStats()
{
return LogicalPlan.builder()
.addOps(Selection.builder()
.addFields(FieldValue.builder().fieldName(OPERATOR_TYPE).alias(OPERATOR_TYPE_ALIAS).build())
.addFields(FunctionImpl.builder()
.functionName(FunctionName.JSON_EXTRACT_PATH_TEXT)
.addValue(FieldValue.builder().fieldName(OPERATOR_STATISTICS).build())
.addValue(StringValue.of(EXTERNAL_BYTES_SCANNED_PATH))
.alias(EXTERNAL_BYTES_SCANNED_ALIAS)
.build()
)
.addFields(FunctionImpl.builder()
.functionName(FunctionName.JSON_EXTRACT_PATH_TEXT)
.addValue(FieldValue.builder().fieldName(OPERATOR_STATISTICS).build())
.addValue(StringValue.of(INPUT_ROWS_PATH))
.alias(INPUT_ROWS_ALIAS)
.build()
)
.source(FunctionalDataset
.builder()
.name(QUERY_OPERATOR_STATS_FUNCTION)
.addValue(StringValue.of(QUERY_ID_PARAMETER))
.build())
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
package org.finos.legend.engine.persistence.components.util;

import java.util.List;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.snowflake.SnowflakeSink;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class LogicalPlanFactoryTest
public class QueryStatsLogicalPlanUtilsTest
{
@Test
public void testLogicalPlanForQueryOperatorStats()
{
RelationalTransformer transformer = new RelationalTransformer(SnowflakeSink.get());
SqlPlan physicalPlanForQueryOperatorStats = transformer.generatePhysicalPlan(LogicalPlanFactory.getLogicalPlanForQueryOperatorStats());
SqlPlan physicalPlanForQueryOperatorStats = transformer.generatePhysicalPlan(QueryStatsLogicalPlanUtils.getLogicalPlanForQueryOperatorStats());
List<String> list = physicalPlanForQueryOperatorStats.getSqlList();
String expectedQuery = "SELECT * FROM TABLE(GET_QUERY_OPERATOR_STATS('{QUERY_ID}'))";
String expectedQuery = "SELECT \"OPERATOR_TYPE\" as \"operatorType\",JSON_EXTRACT_PATH_TEXT(\"OPERATOR_STATISTICS\",'io.external_bytes_scanned') as \"externalBytesScanned\",JSON_EXTRACT_PATH_TEXT(\"OPERATOR_STATISTICS\",'input_rows') as \"inputRows\" FROM TABLE(GET_QUERY_OPERATOR_STATS('{QUERY_ID}'))";
Assertions.assertEquals(expectedQuery, list.get(0));
}
}

0 comments on commit 913fd9f

Please sign in to comment.