Skip to content

Commit

Permalink
make 1M1min possible
Browse files Browse the repository at this point in the history
This PR improves performance to make the 1M1min experiment possible. First, I changed coordinating node pagination from sync to async mode in AnomalyResultTransportAction so that the coordinating node does not have to wait for model nodes' responses before fetching the next page. Second, during the million-entity evaluation, CPU is mostly around 1% with hourly spikes up to 65%. An internal hourly maintenance job can account for the spike due to saving hundreds of thousands of model checkpoints, clearing unused models, and performing bookkeeping for internal states. This PR evens out the resource usage more fairly across a large maintenance window by introducing CheckpointMaintainWorker. Third, during a model corruption, I retrigger cold start for mitigation. Check ModelManager.score, EntityResultTransportAction, and CheckpointReadWorker.

Testing done:
1. Added unit tests.
2. Manually confirmed 1M1min is possible after the above changes.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Jul 28, 2022
1 parent 9f6a5ab commit 7686723
Show file tree
Hide file tree
Showing 30 changed files with 1,468 additions and 368 deletions.
41 changes: 38 additions & 3 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.ratelimit.CheckPointMaintainRequestAdapter;
import org.opensearch.ad.ratelimit.CheckpointMaintainWorker;
import org.opensearch.ad.ratelimit.CheckpointReadWorker;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.ColdEntityWorker;
Expand Down Expand Up @@ -455,6 +457,16 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {

Random random = new Random(42);

CacheProvider cacheProvider = new CacheProvider();

CheckPointMaintainRequestAdapter adapter = new CheckPointMaintainRequestAdapter(
cacheProvider,
checkpoint,
CommonName.CHECKPOINT_INDEX_NAME,
AnomalyDetectorSettings.CHECKPOINT_SAVING_FREQ,
getClock()
);

CheckpointWriteWorker checkpointWriteQueue = new CheckpointWriteWorker(
heapSizeBytes,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_SIZE_IN_BYTES,
Expand All @@ -477,6 +489,26 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.HOURLY_MAINTENANCE
);

CheckpointMaintainWorker checkpointMaintainQueue = new CheckpointMaintainWorker(
heapSizeBytes,
AnomalyDetectorSettings.CHECKPOINT_MAINTAIN_REQUEST_SIZE_IN_BYTES,
AnomalyDetectorSettings.CHECKPOINT_MAINTAIN_QUEUE_MAX_HEAP_PERCENT,
clusterService,
random,
adCircuitBreakerService,
threadPool,
settings,
AnomalyDetectorSettings.MAX_QUEUED_TASKS_RATIO,
getClock(),
AnomalyDetectorSettings.MEDIUM_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.LOW_SEGMENT_PRUNE_RATIO,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
checkpointWriteQueue,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager,
adapter
);

EntityCache cache = new PriorityCache(
checkpoint,
AnomalyDetectorSettings.DEDICATED_CACHE_SIZE.get(settings),
Expand All @@ -489,10 +521,11 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
threadPool,
checkpointWriteQueue,
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT
AnomalyDetectorSettings.MAINTENANCE_FREQ_CONSTANT,
checkpointMaintainQueue
);

CacheProvider cacheProvider = new CacheProvider(cache);
cacheProvider.set(cache);

EntityColdStarter entityColdStarter = new EntityColdStarter(
getClock(),
Expand Down Expand Up @@ -544,7 +577,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
AnomalyDetectorSettings.MIN_PREVIEW_SIZE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
AnomalyDetectorSettings.CHECKPOINT_SAVING_FREQ,
entityColdStarter,
featureManager,
memoryTracker
Expand Down Expand Up @@ -857,8 +890,10 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.CHECKPOINT_READ_QUEUE_MAX_HEAP_PERCENT,
AnomalyDetectorSettings.CHECKPOINT_WRITE_QUEUE_MAX_HEAP_PERCENT,
AnomalyDetectorSettings.RESULT_WRITE_QUEUE_MAX_HEAP_PERCENT,
AnomalyDetectorSettings.CHECKPOINT_MAINTAIN_QUEUE_MAX_HEAP_PERCENT,
AnomalyDetectorSettings.ENTITY_COLD_START_QUEUE_MAX_HEAP_PERCENT,
AnomalyDetectorSettings.EXPECTED_COLD_ENTITY_EXECUTION_TIME_IN_SECS,
AnomalyDetectorSettings.EXPECTED_CHECKPOINT_MAINTAIN_TIME_IN_SECS,
// query limit
LegacyOpenDistroAnomalyDetectorSettings.MAX_ENTITIES_PER_QUERY,
LegacyOpenDistroAnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW,
Expand Down
77 changes: 66 additions & 11 deletions src/main/java/org/opensearch/ad/caching/CacheBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelState;
import org.opensearch.ad.model.InitProgressProfile;
import org.opensearch.ad.ratelimit.CheckpointMaintainRequest;
import org.opensearch.ad.ratelimit.CheckpointMaintainWorker;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DateUtils;

/**
* We use a layered cache to manage active entities’ states. We have a two-level
Expand Down Expand Up @@ -75,7 +79,9 @@ public class CacheBuffer implements ExpiringState {
private final PriorityTracker priorityTracker;
private final Clock clock;
private final CheckpointWriteWorker checkpointWriteQueue;
private final CheckpointMaintainWorker checkpointMaintainQueue;
private final Random random;
private final int checkpointIntervalHrs;

public CacheBuffer(
int minimumCapacity,
Expand All @@ -86,7 +92,8 @@ public CacheBuffer(
Duration modelTtl,
String detectorId,
CheckpointWriteWorker checkpointWriteQueue,
Random random
Random random,
CheckpointMaintainWorker checkpointMaintainQueue
) {
this.memoryConsumptionPerEntity = memoryConsumptionPerEntity;
setMinimumCapacity(minimumCapacity);
Expand All @@ -102,6 +109,8 @@ public CacheBuffer(
this.priorityTracker = new PriorityTracker(clock, intervalSecs, clock.instant().getEpochSecond(), MAX_TRACKING_ENTITIES);
this.checkpointWriteQueue = checkpointWriteQueue;
this.random = random;
this.checkpointMaintainQueue = checkpointMaintainQueue;
this.checkpointIntervalHrs = AnomalyDetectorSettings.CHECKPOINT_SAVING_FREQ.toHoursPart();
}

/**
Expand Down Expand Up @@ -181,6 +190,26 @@ public ModelState<EntityModel> get(String key) {
return node;
}

/**
* Retrieve the ModelState associated with the model Id or null if the CacheBuffer
* contains no mapping for the model Id. Compared to get method, the method won't
* increment entity priority. Used in cache buffer maintenance.
*
* @param key the model Id
* @return the Model state to which the specified model Id is mapped, or null
* if this CacheBuffer contains no mapping for the model Id
*/
public ModelState<EntityModel> getWithoutUpdatePriority(String key) {
// We can get an item that is to be removed soon due to race condition.
// This is acceptable as it won't cause any corruption and exception.
// And this item is used for scoring one last time.
ModelState<EntityModel> node = items.get(key);
if (node == null) {
return null;
}
return node;
}

/**
*
* @return whether there is one item that can be removed from shared cache
Expand Down Expand Up @@ -220,6 +249,18 @@ public ModelState<EntityModel> remove() {
* is no associated ModelState for the key
*/
public ModelState<EntityModel> remove(String keyToRemove) {
return remove(keyToRemove, true);
}

/**
* Remove everything associated with the key and make a checkpoint if input specified so.
*
* @param keyToRemove The key to remove
* @param saveCheckpoint Whether saving checkpoint or not
* @return the associated ModelState associated with the key, or null if there
* is no associated ModelState for the key
*/
public ModelState<EntityModel> remove(String keyToRemove, boolean saveCheckpoint) {
priorityTracker.removePriority(keyToRemove);

// if shared cache is empty, we are using reserved memory
Expand All @@ -235,11 +276,13 @@ public ModelState<EntityModel> remove(String keyToRemove) {

EntityModel modelRemoved = valueRemoved.getModel();
if (modelRemoved != null) {
// null model has only samples. For null model we save a checkpoint
// regardless of last checkpoint time. whether If we don't save,
// we throw the new samples and might never be able to initialize the model
boolean isNullModel = !modelRemoved.getTrcf().isPresent();
checkpointWriteQueue.write(valueRemoved, isNullModel, RequestPriority.MEDIUM);
if (saveCheckpoint) {
// null model has only samples. For null model we save a checkpoint
// regardless of last checkpoint time. whether If we don't save,
// we throw the new samples and might never be able to initialize the model
boolean isNullModel = !modelRemoved.getTrcf().isPresent();
checkpointWriteQueue.write(valueRemoved, isNullModel, RequestPriority.MEDIUM);
}

modelRemoved.clear();
}
Expand Down Expand Up @@ -304,13 +347,16 @@ public ModelState<EntityModel> replace(String entityModelId, ModelState<EntityMo
* @return removed states
*/
public List<ModelState<EntityModel>> maintenance() {
List<ModelState<EntityModel>> modelsToSave = new ArrayList<>();
List<CheckpointMaintainRequest> modelsToSave = new ArrayList<>();
List<ModelState<EntityModel>> removedStates = new ArrayList<>();
Instant now = clock.instant();
int currentHour = DateUtils.getHourOfDay(now);
int currentSlot = currentHour % checkpointIntervalHrs;
items.entrySet().stream().forEach(entry -> {
String entityModelId = entry.getKey();
try {
ModelState<EntityModel> modelState = entry.getValue();
Instant now = clock.instant();

if (modelState.getLastUsedTime().plus(modelTtl).isBefore(now)) {
// race conditions can happen between the put and one of the following operations:
// remove: not a problem as all of the data structures are concurrent.
Expand All @@ -322,7 +368,7 @@ public List<ModelState<EntityModel>> maintenance() {
// already in the cache
// remove method saves checkpoint as well
removedStates.add(remove(entityModelId));
} else if (random.nextInt(6) == 0) {
} else if (Math.abs(entityModelId.hashCode()) % checkpointIntervalHrs == currentSlot) {
// checkpoint is relatively big compared to other queued requests
// save checkpoints with 1/6 probability as we expect to save
// all every 6 hours statistically
Expand Down Expand Up @@ -350,15 +396,24 @@ public List<ModelState<EntityModel>> maintenance() {
// is stale (i.e., we don't recover from the freshest model in disaster.).
//
// All in all, randomness is mostly due to performance and easy maintenance.
modelsToSave.add(modelState);
modelsToSave
.add(
new CheckpointMaintainRequest(
// the request expires when the next maintainance starts
System.currentTimeMillis() + modelTtl.toMillis(),
detectorId,
RequestPriority.LOW,
entityModelId
)
);
}

} catch (Exception e) {
LOG.warn("Failed to finish maintenance for model id " + entityModelId, e);
}
});

checkpointWriteQueue.writeAll(modelsToSave, detectorId, false, RequestPriority.MEDIUM);
checkpointMaintainQueue.putAll(modelsToSave);
return removedStates;
}

Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/opensearch/ad/caching/CacheProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
public class CacheProvider implements Provider<EntityCache> {
private EntityCache cache;

public CacheProvider(EntityCache cache) {
this.cache = cache;
public CacheProvider() {

}

@Override
public EntityCache get() {
return cache;
}

public void set(EntityCache cache) {
this.cache = cache;
}
}
15 changes: 15 additions & 0 deletions src/main/java/org/opensearch/ad/caching/EntityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,19 @@ Pair<List<Entity>, List<Entity>> selectUpdateCandidate(
* @return the entity's memory size
*/
Optional<ModelProfile> getModelProfile(String detectorId, String entityModelId);

/**
* Get a model state without incurring priority update. Used in maintenance.
* @param detectorId Detector Id
* @param modelId Model Id
* @return Model state
*/
Optional<ModelState<EntityModel>> getForMaintainance(String detectorId, String modelId);

/**
* Remove entity model from active entity buffer and delete checkpoint. Used to clean corrupted model.
* @param detectorId Detector Id
* @param entityModelId Model Id
*/
void removeEntityModel(String detectorId, String entityModelId);
}
50 changes: 48 additions & 2 deletions src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
Expand All @@ -51,6 +52,7 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.ratelimit.CheckpointMaintainWorker;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -86,6 +88,7 @@ public class PriorityCache implements EntityCache {
// it again and again for no obvious benefits.
private Instant lastInActiveEntityMaintenance;
protected int maintenanceFreqConstant;
private CheckpointMaintainWorker checkpointMaintainQueue;

public PriorityCache(
CheckpointDao checkpointDao,
Expand All @@ -99,7 +102,8 @@ public PriorityCache(
Duration modelTtl,
ThreadPool threadPool,
CheckpointWriteWorker checkpointWriteQueue,
int maintenanceFreqConstant
int maintenanceFreqConstant,
CheckpointMaintainWorker checkpointMaintainQueue
) {
this.checkpointDao = checkpointDao;

Expand Down Expand Up @@ -131,6 +135,7 @@ public PriorityCache(
this.checkpointWriteQueue = checkpointWriteQueue;
this.lastInActiveEntityMaintenance = Instant.MIN;
this.maintenanceFreqConstant = maintenanceFreqConstant;
this.checkpointMaintainQueue = checkpointMaintainQueue;
}

@Override
Expand Down Expand Up @@ -450,7 +455,8 @@ private CacheBuffer computeBufferIfAbsent(AnomalyDetector detector, String detec
modelTtl,
detectorId,
checkpointWriteQueue,
random
random,
checkpointMaintainQueue
);
activeEnities.put(detectorId, buffer);
// There can be race conditions between tryClearUpMemory and
Expand Down Expand Up @@ -891,4 +897,44 @@ private void validateDedicatedCacheSize(Integer newDedicatedCacheSize) {
}
}
}

/**
* Get a model state without incurring priority update. Used in maintenance.
* @param detectorId Detector Id
* @param modelId Model Id
* @return Model state
*/
@Override
public Optional<ModelState<EntityModel>> getForMaintainance(String detectorId, String modelId) {
CacheBuffer buffer = activeEnities.get(detectorId);
if (buffer == null) {
return Optional.empty();
}
return Optional.ofNullable(buffer.getWithoutUpdatePriority(modelId));
}

/**
* Remove entity model from active entity buffer and delete checkpoint. Used to clean corrupted model.
* @param detectorId Detector Id
* @param entityModelId Model Id
*/
@Override
public void removeEntityModel(String detectorId, String entityModelId) {
CacheBuffer buffer = activeEnities.get(detectorId);
if (buffer != null) {
ModelState<EntityModel> removed = null;
if ((removed = buffer.remove(entityModelId, false)) != null) {
addIntoInactiveCache(removed);
}
}
checkpointDao
.deleteModelCheckpoint(
entityModelId,
ActionListener
.wrap(
r -> LOG.debug(new ParameterizedMessage("Succeeded in deleting checkpoint [{}].", entityModelId)),
e -> LOG.error(new ParameterizedMessage("Failed to delete checkpoint [{}].", entityModelId), e)
)
);
}
}
Loading

0 comments on commit 7686723

Please sign in to comment.