Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.17] Passing user to getLatestDataTime call #1298

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.query.BoolQueryBuilder;
Expand Down Expand Up @@ -155,14 +156,28 @@ public SearchFeatureDao(
* @param listener onResponse is called with the epoch time of the latest data under the detector
*/
public void getLatestDataTime(Config config, Optional<Entity> entity, AnalysisType context, ActionListener<Optional<Long>> listener) {
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery();
getLatestDataTime(null, config, entity, context, listener);
}

/**
* Returns to listener the epoch time of the latest data under the detector.
*
* @param config info about the data
* @param listener onResponse is called with the epoch time of the latest data under the detector
*/
public void getLatestDataTime(
User user,
Config config,
Optional<Entity> entity,
AnalysisType context,
ActionListener<Optional<Long>> listener
) {
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery();
if (entity.isPresent()) {
for (TermQueryBuilder term : entity.get().getTermQueryForCustomerIndex()) {
internalFilterQuery.filter(term);
}
}

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(internalFilterQuery)
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(config.getTimeField()))
Expand All @@ -172,15 +187,27 @@ public void getLatestDataTime(Config config, Optional<Entity> entity, AnalysisTy
.wrap(response -> listener.onResponse(ParseUtils.getLatestDataTime(response)), listener::onFailure);
// using the original context in listener as user roles have no permissions for internal operations like fetching a
// checkpoint
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
config.getId(),
client,
context,
searchResponseListener
);
if (user != null) {
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
user,
client,
context,
searchResponseListener
);
} else {
clientUtil
.<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity(
searchRequest,
client::search,
config.getId(),
client,
context,
searchResponseListener
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public abstract class AbstractTimeSeriesActionHandler<T extends ActionResponse,

public static final String NAME_REGEX = "[a-zA-Z0-9._-]+";
public static final Integer MAX_NAME_SIZE = 64;
public static final String CATEGORY_NOT_FOUND_ERR_MSG = "Can't find the categorical field %s";
public static final String CATEGORY_NOT_FOUND_ERR_MSG = "Can't find the categorical field %s in index %s";

public static String INVALID_NAME_SIZE = "Name should be shortened. The maximum limit is "
+ AbstractTimeSeriesActionHandler.MAX_NAME_SIZE
Expand Down Expand Up @@ -562,11 +562,11 @@ protected void validateCategoricalFieldsInAllIndices(String configId, boolean in

Iterator<Map.Entry<String, List<String>>> iterator = clusterIndicesMap.entrySet().iterator();

validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener);
validateCategoricalField(iterator, configId, indexingDryRun, listener);

}

protected void validateCategoricalFieldRecursive(
protected void validateCategoricalField(
Iterator<Map.Entry<String, List<String>>> iterator,
String configId,
boolean indexingDryRun,
Expand Down Expand Up @@ -645,13 +645,19 @@ protected void validateCategoricalFieldRecursive(
listener
.onFailure(
createValidationException(
String.format(Locale.ROOT, CATEGORY_NOT_FOUND_ERR_MSG, categoryField0),
String
.format(
Locale.ROOT,
CATEGORY_NOT_FOUND_ERR_MSG,
categoryField0,
Arrays.toString(clusterIndicesEntry.getValue().toArray(new String[0]))
),
ValidationIssueType.CATEGORY
)
);
return;
}
validateCategoricalFieldRecursive(iterator, configId, indexingDryRun, listener);
validateCategoricalField(iterator, configId, indexingDryRun, listener);

}, error -> {
String message = String.format(Locale.ROOT, CommonMessages.FAIL_TO_GET_MAPPING_MSG, config.getIndices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ public void onFailure(Exception e) {
* interval can be a practical approach in scenarios where data arrival is irregular and there's
* a need to balance between capturing data features and avoiding over-sampling.
*
* @param topEntity top entity to use
* @param timeStampBounds Used to determine start and end date range to search for data
* @param listener returns minimum interval
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public LatestTimeRetriever(
* @param listener to return latest time and entity attributes if the config is HC
*/
public void checkIfHC(ActionListener<Pair<Optional<Long>, Map<String, Object>>> listener) {
searchFeatureDao.getLatestDataTime(config, Optional.empty(), context, ActionListener.wrap(latestTime -> {
searchFeatureDao.getLatestDataTime(user, config, Optional.empty(), context, ActionListener.wrap(latestTime -> {
if (latestTime.isEmpty()) {
listener.onResponse(Pair.of(Optional.empty(), Collections.emptyMap()));
} else if (config.isHighCardinality()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public void start() {
latestEntityAttributes.getRight()
),
exception -> {
listener.onFailure(exception);
logger.error("Failed to create search request for last data point", exception);
listener.onFailure(exception);
}
);
latestTimeRetriever.checkIfHC(latestTimeListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public static HashMap<String, List<String>> separateClusterIndexes(List<String>
Pair<String, String> clusterAndIndex = parseClusterAndIndexName(index);
String clusterName = clusterAndIndex.getKey();
String indexName = clusterAndIndex.getValue();
logger.info("clusterName: " + clusterName);
logger.info("indexName: " + indexName);

// If the index entry does not have a cluster_name, it indicates the index is on the local cluster.
if (clusterName.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,7 @@ public void testValidateAnomalyDetectorWithWrongCategoryField() throws Exception
.extractValue("detector", responseMap);
assertEquals(
"non-existing category",
String.format(Locale.ROOT, AbstractTimeSeriesActionHandler.CATEGORY_NOT_FOUND_ERR_MSG, "host.keyword"),
String.format(Locale.ROOT, AbstractTimeSeriesActionHandler.CATEGORY_NOT_FOUND_ERR_MSG, "host.keyword", "[index-test]"),
messageMap.get("category_field").get("message")
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1709,7 +1709,7 @@ public void testValidateHC() throws Exception {
assertEquals("Validate forecaster model failed", RestStatus.OK, TestHelpers.restStatus(response));
responseMap = entityAsMap(response);
validations = (Map<String, Object>) ((Map<String, Object>) responseMap.get("forecaster")).get("category_field");
assertEquals("Can't find the categorical field 476465", validations.get("message"));
assertEquals("Can't find the categorical field 476465 in index [rule]", validations.get("message"));

// case 3: validate data sparsity with one categorical field
forecasterDef = "{\n"
Expand Down
Loading