Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
add AD task cache (#337)
Browse files Browse the repository at this point in the history
* add AD task cache

* add java doc for exception

* change to reserved memory

* fix shingle memory calculation;store threshold model training data in double array

* address comments
  • Loading branch information
ylwu-amzn committed Dec 23, 2020
1 parent 2ae77ed commit d5683f6
Show file tree
Hide file tree
Showing 9 changed files with 713 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.INDEX_PRESSURE_SOFT_LIMIT,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class MemoryTracker {

public enum Origin {
SINGLE_ENTITY_DETECTOR,
MULTI_ENTITY_DETECTOR
MULTI_ENTITY_DETECTOR,
HISTORICAL_SINGLE_ENTITY_DETECTOR,
}

// memory tracker for total consumption of bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ public LimitExceededException(String anomalyDetectorId, String message) {
super(anomalyDetectorId, message, true);
}

/**
* Constructor with error message.
*
* @param message explanation for the limit
*/
public LimitExceededException(String message) {
super(null, message, true);
}

/**
* Constructor with an anomaly detector ID and an explanation, and a flag for stopping.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,18 @@ private AnomalyDetectorSettings() {}
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// Maximum number of batch tasks running on one node.
// TODO: performance test and tune the setting.
public static final Setting<Integer> MAX_BATCH_TASK_PER_NODE = Setting
.intSetting(
"opendistro.anomaly_detection.max_batch_task_per_node",
2,
1,
100,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static int THRESHOLD_MODEL_TRAINING_SIZE = 1000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.task;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.NUM_MIN_SAMPLES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.NUM_TREES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.THRESHOLD_MODEL_TRAINING_SIZE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.TIME_DECAY;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.amazon.opendistroforelasticsearch.ad.ml.HybridThresholdingModel;
import com.amazon.opendistroforelasticsearch.ad.ml.ThresholdingModel;
import com.amazon.opendistroforelasticsearch.ad.model.ADTask;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.randomcutforest.RandomCutForest;

/**
* AD batch task cache which will hold RCF, threshold model, shingle and training data.
*/
public class ADBatchTaskCache {
private final String detectorId;
private RandomCutForest rcfModel;
private ThresholdingModel thresholdModel;
private boolean thresholdModelTrained;
private Deque<Map.Entry<Long, Optional<double[]>>> shingle;
private AtomicInteger thresholdModelTrainingDataSize = new AtomicInteger(0);
private double[] thresholdModelTrainingData;
private AtomicBoolean cancelled = new AtomicBoolean(false);
private AtomicLong cacheMemorySize = new AtomicLong(0);
private String cancelReason;
private String cancelledBy;

protected ADBatchTaskCache(ADTask adTask) {
this.detectorId = adTask.getDetectorId();

AnomalyDetector detector = adTask.getDetector();
rcfModel = RandomCutForest
.builder()
.dimensions(detector.getShingleSize() * detector.getEnabledFeatureIds().size())
.numberOfTrees(NUM_TREES)
.lambda(TIME_DECAY)
.sampleSize(NUM_SAMPLES_PER_TREE)
.outputAfter(NUM_MIN_SAMPLES)
.parallelExecutionEnabled(false)
.build();

this.thresholdModel = new HybridThresholdingModel(
AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
AnomalyDetectorSettings.THRESHOLD_MAX_RANK_ERROR,
AnomalyDetectorSettings.THRESHOLD_MAX_SCORE,
AnomalyDetectorSettings.THRESHOLD_NUM_LOGNORMAL_QUANTILES,
AnomalyDetectorSettings.THRESHOLD_DOWNSAMPLES,
AnomalyDetectorSettings.THRESHOLD_MAX_SAMPLES
);
this.thresholdModelTrainingData = new double[THRESHOLD_MODEL_TRAINING_SIZE];
this.thresholdModelTrained = false;
this.shingle = new ArrayDeque<>(detector.getShingleSize());
}

protected String getDetectorId() {
return detectorId;
}

protected RandomCutForest getRcfModel() {
return rcfModel;
}

protected Deque<Map.Entry<Long, Optional<double[]>>> getShingle() {
return shingle;
}

protected ThresholdingModel getThresholdModel() {
return thresholdModel;
}

protected void setThresholdModelTrained(boolean thresholdModelTrained) {
this.thresholdModelTrained = thresholdModelTrained;
}

protected boolean isThresholdModelTrained() {
return thresholdModelTrained;
}

protected double[] getThresholdModelTrainingData() {
return thresholdModelTrainingData;
}

protected void clearTrainingData() {
this.thresholdModelTrainingData = null;
this.thresholdModelTrainingDataSize.set(0);
}

public AtomicInteger getThresholdModelTrainingDataSize() {
return thresholdModelTrainingDataSize;
}

protected AtomicLong getCacheMemorySize() {
return cacheMemorySize;
}

protected boolean isCancelled() {
return cancelled.get();
}

protected String getCancelReason() {
return cancelReason;
}

protected String getCancelledBy() {
return cancelledBy;
}

protected void cancel(String reason, String userName) {
this.cancelled.compareAndSet(false, true);
this.cancelReason = reason;
this.cancelledBy = userName;
}
}
Loading

0 comments on commit d5683f6

Please sign in to comment.