Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add checkpoint index retention for multi entity detector #283

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;

@Deprecated
public class DailyCron implements Runnable {
private static final Logger LOG = LogManager.getLogger(DailyCron.class);
protected static final String FIELD_MODEL = "queue";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool;

import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.IndexCleanup;
import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.google.common.annotations.VisibleForTesting;

public class MasterEventListener implements LocalNodeMasterListener {

private Cancellable dailyCron;
private Cancellable checkpointIndexRetentionCron;
private Cancellable hourlyCron;
private ClusterService clusterService;
private ThreadPool threadPool;
Expand Down Expand Up @@ -70,18 +73,19 @@ public void beforeStop() {
});
}

if (dailyCron == null) {
dailyCron = threadPool
if (checkpointIndexRetentionCron == null) {
IndexCleanup indexCleanup = new IndexCleanup(client, clientUtil, clusterService);
checkpointIndexRetentionCron = threadPool
.scheduleWithFixedDelay(
new DailyCron(clock, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil),
new ModelCheckpointIndexRetention(AnomalyDetectorSettings.CHECKPOINT_TTL, clock, indexCleanup),
TimeValue.timeValueHours(24),
executorName()
);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
cancel(dailyCron);
dailyCron = null;
cancel(checkpointIndexRetentionCron);
checkpointIndexRetentionCron = null;
}
});
}
Expand All @@ -90,9 +94,9 @@ public void beforeStop() {
@Override
public void offMaster() {
cancel(hourlyCron);
cancel(dailyCron);
cancel(checkpointIndexRetentionCron);
hourlyCron = null;
dailyCron = null;
checkpointIndexRetentionCron = null;
}

private void cancel(Cancellable cron) {
Expand All @@ -101,11 +105,12 @@ private void cancel(Cancellable cron) {
}
}

public Cancellable getDailyCron() {
return dailyCron;
@VisibleForTesting
protected Cancellable getCheckpointIndexRetentionCron() {
return checkpointIndexRetentionCron;
}

public Cancellable getHourlyCron() {
protected Cancellable getHourlyCron() {
return hourlyCron;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup;

import java.util.Arrays;
import java.util.Objects;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.store.StoreStats;

import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;

/**
* Clean up the old docs for indices.
*/
public class IndexCleanup {
private static final Logger LOG = LogManager.getLogger(IndexCleanup.class);

private final Client client;
private final ClientUtil clientUtil;
private final ClusterService clusterService;

public IndexCleanup(Client client, ClientUtil clientUtil, ClusterService clusterService) {
this.client = client;
this.clientUtil = clientUtil;
this.clusterService = clusterService;
}

/**
* delete docs when shard size is bigger than max limitation.
* @param indexName index name
* @param maxShardSize max shard size
* @param queryForDeleteByQueryRequest query request
* @param listener action listener
*/
public void deleteDocsBasedOnShardSize(
String indexName,
long maxShardSize,
QueryBuilder queryForDeleteByQueryRequest,
ActionListener<Boolean> listener
) {

if (!clusterService.state().getRoutingTable().hasIndex(indexName)) {
LOG.debug("skip as the index:{} doesn't exist", indexName);
return;
}

ActionListener<IndicesStatsResponse> indicesStatsResponseListener = ActionListener.wrap(indicesStatsResponse -> {
// Check if any shard size is bigger than maxShardSize
boolean cleanupNeeded = Arrays
.stream(indicesStatsResponse.getShards())
.map(ShardStats::getStats)
.filter(Objects::nonNull)
.map(CommonStats::getStore)
.filter(Objects::nonNull)
.map(StoreStats::getSizeInBytes)
.anyMatch(size -> size > maxShardSize);

if (cleanupNeeded) {
deleteDocsByQuery(
indexName,
queryForDeleteByQueryRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)
);
} else {
listener.onResponse(false);
}
}, listener::onFailure);

getCheckpointShardStoreStats(indexName, indicesStatsResponseListener);
}

private void getCheckpointShardStoreStats(String indexName, ActionListener<IndicesStatsResponse> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.store();
indicesStatsRequest.indices(indexName);
client.admin().indices().stats(indicesStatsRequest, listener);
}

/**
* Delete docs based on query request
* @param indexName index name
* @param queryForDeleteByQueryRequest query request
* @param listener action listener
*/
public void deleteDocsByQuery(String indexName, QueryBuilder queryForDeleteByQueryRequest, ActionListener<Long> listener) {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName)
.setQuery(queryForDeleteByQueryRequest)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.setRefresh(true);

try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> {
// if 0 docs get deleted, it means our query cannot find any matching doc
LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName);
listener.onResponse(response.getDeleted());
}, listener::onFailure));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup;

import java.time.Clock;
import java.time.Duration;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.query.QueryBuilders;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao;

/**
* Model checkpoints cleanup of multi-entity detectors.
* <p> <b>Problem:</b>
* In multi-entity detectors, we can have thousands, even millions of entities, of which the model checkpoints will consume
* lots of disk resources. To protect the our disk usage, the checkpoint index size will be limited with specified threshold.
* Once its size exceeds the threshold, the model checkpoints cleanup process will be activated.
* </p>
* <p> <b>Solution:</b>
* Before multi-entity detectors, there is daily cron job to clean up the inactive checkpoints longer than some configurable days.
* We will keep the this logic, and add new clean up way based on shard size.
* </p>
*/
public class ModelCheckpointIndexRetention implements Runnable {
private static final Logger LOG = LogManager.getLogger(ModelCheckpointIndexRetention.class);

// The recommended max shard size is 50G, we don't wanna our index exceeds this number
private static final long MAX_SHARD_SIZE_IN_BYTE = 50 * 1024 * 1024 * 1024L;
// We can't clean up all of the checkpoints. At least keep models for 1 day
private static final Duration MINIMUM_CHECKPOINT_TTL = Duration.ofDays(1);

private final Duration defaultCheckpointTtl;
private final Clock clock;
private final IndexCleanup indexCleanup;

public ModelCheckpointIndexRetention(Duration defaultCheckpointTtl, Clock clock, IndexCleanup indexCleanup) {
this.defaultCheckpointTtl = defaultCheckpointTtl;
this.clock = clock;
this.indexCleanup = indexCleanup;
}

@Override
public void run() {
indexCleanup
.deleteDocsByQuery(
CommonName.CHECKPOINT_INDEX_NAME,
QueryBuilders
.boolQuery()
.filter(
QueryBuilders
.rangeQuery(CheckpointDao.TIMESTAMP)
.lte(clock.millis() - defaultCheckpointTtl.toMillis())
.format(CommonName.EPOCH_MILLIS_FORMAT)
),
ActionListener
.wrap(
response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); },
// The docs will be deleted in next scheduled windows. No need for retrying.
exception -> LOG.error("delete docs by query fails for checkpoint index", exception)
weicongs-amazon marked this conversation as resolved.
Show resolved Hide resolved
)
);

}

private void cleanupBasedOnShardSize(Duration cleanUpTtl) {
indexCleanup
.deleteDocsBasedOnShardSize(
CommonName.CHECKPOINT_INDEX_NAME,
MAX_SHARD_SIZE_IN_BYTE,
QueryBuilders
.boolQuery()
.filter(
QueryBuilders
.rangeQuery(CheckpointDao.TIMESTAMP)
.lte(clock.millis() - cleanUpTtl.toMillis())
.format(CommonName.EPOCH_MILLIS_FORMAT)
),
ActionListener.wrap(cleanupNeeded -> {
if (cleanupNeeded) {
if (cleanUpTtl.equals(MINIMUM_CHECKPOINT_TTL)) {
return;
}

Duration nextCleanupTtl = cleanUpTtl.minusDays(1);
weicongs-amazon marked this conversation as resolved.
Show resolved Hide resolved
if (nextCleanupTtl.compareTo(MINIMUM_CHECKPOINT_TTL) < 0) {
nextCleanupTtl = MINIMUM_CHECKPOINT_TTL;
}
cleanupBasedOnShardSize(nextCleanupTtl);
} else {
LOG.debug("clean up not needed anymore for checkpoint index");
}
},
// The docs will be deleted in next scheduled windows. No need for retrying.
exception -> LOG.error("checkpoint index retention based on shard size fails", exception)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.junit.Before;

import com.amazon.opendistroforelasticsearch.ad.AbstractADTest;
import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
Expand All @@ -46,7 +47,7 @@ public class MasterEventListenerTests extends AbstractADTest {
private Client client;
private Clock clock;
private Cancellable hourlyCancellable;
private Cancellable dailyCancellable;
private Cancellable checkpointIndexRetentionCancellable;
private MasterEventListener masterService;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;
Expand All @@ -58,10 +59,11 @@ public void setUp() throws Exception {
clusterService = mock(ClusterService.class);
threadPool = mock(ThreadPool.class);
hourlyCancellable = mock(Cancellable.class);
dailyCancellable = mock(Cancellable.class);
checkpointIndexRetentionCancellable = mock(Cancellable.class);
when(threadPool.scheduleWithFixedDelay(any(HourlyCron.class), any(TimeValue.class), any(String.class)))
.thenReturn(hourlyCancellable);
when(threadPool.scheduleWithFixedDelay(any(DailyCron.class), any(TimeValue.class), any(String.class))).thenReturn(dailyCancellable);
when(threadPool.scheduleWithFixedDelay(any(ModelCheckpointIndexRetention.class), any(TimeValue.class), any(String.class)))
.thenReturn(checkpointIndexRetentionCancellable);
client = mock(Client.class);
clock = mock(Clock.class);
clientUtil = mock(ClientUtil.class);
Expand All @@ -75,11 +77,11 @@ public void setUp() throws Exception {
public void testOnOffMaster() {
masterService.onMaster();
assertThat(hourlyCancellable, is(notNullValue()));
assertThat(dailyCancellable, is(notNullValue()));
assertThat(checkpointIndexRetentionCancellable, is(notNullValue()));
assertTrue(!masterService.getHourlyCron().isCancelled());
assertTrue(!masterService.getDailyCron().isCancelled());
assertTrue(!masterService.getCheckpointIndexRetentionCron().isCancelled());
masterService.offMaster();
assertThat(masterService.getDailyCron(), is(nullValue()));
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
}

Expand All @@ -100,10 +102,10 @@ public void testBeforeStop() {
}).when(clusterService).addLifecycleListener(any());

masterService.onMaster();
assertThat(masterService.getDailyCron(), is(nullValue()));
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
masterService.offMaster();
assertThat(masterService.getDailyCron(), is(nullValue()));
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
}
}
Loading