Skip to content

Commit

Permalink
Stats API: moved detector count call outside transport layer and make…
Browse files Browse the repository at this point in the history
  • Loading branch information
jmazanec15 committed May 6, 2020
1 parent c44b951 commit 7a128d9
Show file tree
Hide file tree
Showing 17 changed files with 511 additions and 167 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.CronRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction',
'com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction',
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.CounterSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.DocumentCountSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.CronAction;
Expand Down Expand Up @@ -209,6 +209,7 @@ public List<RestHandler> getRestHandlers(
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(
restController,
adStats,
clusterService,
this.nodeFilter
);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
Expand Down Expand Up @@ -341,10 +342,7 @@ public Collection<Object> createComponents(
StatNames.MODELS_CHECKPOINT_INDEX_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.CHECKPOINT_INDEX_NAME))
)
.put(
StatNames.DETECTOR_COUNT.getName(),
new ADStat<>(true, new DocumentCountSupplier(indexUtils, AnomalyDetector.ANOMALY_DETECTORS_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.build();

adStats = new ADStats(indexUtils, modelManager, stats);
Expand Down Expand Up @@ -440,7 +438,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new ActionHandler<>(ThresholdResultAction.INSTANCE, ThresholdResultTransportAction.class),
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class),
new ActionHandler<>(ADStatsAction.INSTANCE, ADStatsTransportAction.class)
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,34 @@

import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

Expand All @@ -45,19 +58,27 @@ public class RestStatsAnomalyDetectorAction extends BaseRestHandler {
private static final String STATS_ANOMALY_DETECTOR_ACTION = "stats_anomaly_detector";
private ADStats adStats;
private DiscoveryNodeFilterer nodeFilter;
private ClusterService clusterService;

/**
* Constructor
*
* @param controller Rest Controller
* @param adStats ADStats object
* @param clusterService ClusterService
* @param nodeFilter util class to get eligible data nodes
*/
public RestStatsAnomalyDetectorAction(RestController controller, ADStats adStats, DiscoveryNodeFilterer nodeFilter) {
public RestStatsAnomalyDetectorAction(
RestController controller,
ADStats adStats,
ClusterService clusterService,
DiscoveryNodeFilterer nodeFilter
) {
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/{stat}", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/{stat}", this);
this.clusterService = clusterService;
this.adStats = adStats;
this.nodeFilter = nodeFilter;
}
Expand All @@ -73,14 +94,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
ADStatsRequest adStatsRequest = getRequest(request);
return channel -> client.execute(ADStatsAction.INSTANCE, adStatsRequest, new RestActions.NodesResponseRestListener<>(channel));
return channel -> getStats(client, channel, adStatsRequest);
}

/**
* Creates a ADStatsRequest from a RestRequest
*
* @param request RestRequest
* @return ADStatsRequest
* @return ADStatsRequest Request containing stats to be retrieved
*/
private ADStatsRequest getRequest(RestRequest request) {
// parse the nodes the user wants to query the stats for
Expand Down Expand Up @@ -129,4 +150,112 @@ private ADStatsRequest getRequest(RestRequest request) {
}
return adStatsRequest;
}

/**
* Make the 2 requests to get the node and cluster statistics
*
* @param client Client
* @param channel Channel to send response
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getStats(Client client, RestChannel channel, ADStatsRequest adStatsRequest) {
// Use MultiResponsesDelegateActionListener to execute 2 async requests and create the response once they finish
MultiResponsesDelegateActionListener<ADStatsResponse> delegateListener = new MultiResponsesDelegateActionListener<>(
getRestStatsListener(channel),
2,
"Unable to return AD Stats"
);

getClusterStats(client, delegateListener, adStatsRequest);
getNodeStats(client, delegateListener, adStatsRequest);
}

/**
* Make async request to get the number of detectors in AnomalyDetector.ANOMALY_DETECTORS_INDEX if necessary
* and, onResponse, gather the cluster statistics
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getClusterStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
ADStatsResponse adStatsResponse = new ADStatsResponse();
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().docs(true);
client.execute(IndicesStatsAction.INSTANCE, indicesStatsRequest, ActionListener.wrap(indicesStatsResponse -> {
adStats
.getStat(StatNames.DETECTOR_COUNT.getName())
.setValue(indicesStatsResponse.getIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX).getPrimaries().docs.getCount());
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e))));
} else {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
} else {
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
}

/**
* Make async request to get the Anomaly Detection statistics from each node and, onResponse, set the
* ADStatsNodesResponse field of ADStatsResponse
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getNodeStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> {
ADStatsResponse restADStatsResponse = new ADStatsResponse();
restADStatsResponse.setADStatsNodesResponse(adStatsResponse);
listener.onResponse(restADStatsResponse);
}, listener::onFailure));
}

/**
* Collect Cluster Stats into map to be retrieved
*
* @param adStatsRequest Request containing stats to be retrieved
* @return Map containing Cluster Stats
*/
private Map<String, Object> getClusterStatsMap(ADStatsRequest adStatsRequest) {
Map<String, Object> clusterStats = new HashMap<>();
Set<String> statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved();
adStats
.getClusterStats()
.entrySet()
.stream()
.filter(s -> statsToBeRetrieved.contains(s.getKey()))
.forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue()));
return clusterStats;
}

/**
* Listener sends response once Node Stats and Cluster Stats are gathered
*
* @param channel Channel
* @return ActionListener for ADStatsResponse
*/
private ActionListener<ADStatsResponse> getRestStatsListener(RestChannel channel) {
return ActionListener
.wrap(
adStatsResponse -> {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, adStatsResponse.toXContent(channel.newBuilder())));
},
exception -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, exception.getMessage()))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.stats;

import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.CounterSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;

import java.util.function.Supplier;

Expand Down Expand Up @@ -55,6 +56,17 @@ public T getValue() {
return supplier.get();
}

/**
* Set the value of the statistic
*
* @param value set value
*/
public void setValue(Long value) {
if (supplier instanceof SettableSupplier) {
((SettableSupplier) supplier).set(value);
}
}

/**
* Increments the supplier if it can be incremented
*/
Expand Down
Loading

0 comments on commit 7a128d9

Please sign in to comment.