Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Circuit breaker service #7

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.ad;

import com.amazon.opendistroforelasticsearch.ad.breaker.ADCircuitBreakerService;
import com.amazon.opendistroforelasticsearch.ad.cluster.ADClusterEventListener;
import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData;
import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData.ADMetaDataDiff;
Expand Down Expand Up @@ -209,11 +210,13 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
AnomalyDetectorSettings.CHECKPOINT_TTL);
HourlyCron hourlyCron = new HourlyCron(clusterService, client);

ADCircuitBreakerService adCircuitBreakerService = new ADCircuitBreakerService(jvmService).init();

return ImmutableList.of(anomalyDetectionIndices, anomalyDetectorRunner, searchFeatureDao,
singleFeatureLinearUniformInterpolator, interpolator, gson, jvmService, hashRing, featureManager,
modelManager, clock, stateManager, runner,
new ADClusterEventListener(clusterService, hashRing, modelManager),
deleteUtil, dailyCron, hourlyCron,
deleteUtil, dailyCron, hourlyCron, adCircuitBreakerService,
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.breaker;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.monitor.jvm.JvmService;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Class {@code ADCircuitBreakerService} provide storing, retrieving circuit breakers functions.
*
* This service registers internal system breakers and provide API for users to register their own breakers.
*/
public class ADCircuitBreakerService {

private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
private final JvmService jvmService;

private static final Logger logger = LogManager.getLogger(ADCircuitBreakerService.class);

/**
* Constructor.
*
* @param jvmService jvm info
*/
public ADCircuitBreakerService(JvmService jvmService) {
this.jvmService = jvmService;
}

public void registerBreaker(String name, CircuitBreaker breaker) {
breakers.putIfAbsent(name, breaker);
}

public void unregisterBreaker(String name) {
if (name == null) {
return;
}

breakers.remove(name);
}

public void clearBreakers() {
breakers.clear();
}

public CircuitBreaker getBreaker(String name) {
return breakers.get(name);
}

/**
* Initialize circuit breaker service.
*
* Register memory breaker by default.
*
* @return ADCircuitBreakerService
*/
public ADCircuitBreakerService init() {
//Register memory circuit breaker
registerBreaker(BreakerName.MEM.getName(), new MemoryCircuitBreaker(this.jvmService));
logger.info("Registered memory breaker.");

return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.breaker;

public enum BreakerName {

MEM("memory"),
CPU("cpu");

private String name;

BreakerName(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.breaker;

/**
* An interface for circuit breaker.
*
* We use circuit breaker to protect a certain system resource like memory, cpu etc.
*/
public interface CircuitBreaker {

boolean isOpen();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.breaker;

import org.elasticsearch.monitor.jvm.JvmService;

/**
* A circuit breaker for memory usage.
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {

private static final short defaultThreshold = 85;
private final JvmService jvmService;

public MemoryCircuitBreaker(JvmService jvmService) {
super(defaultThreshold);
this.jvmService = jvmService;
}

public MemoryCircuitBreaker(short threshold, JvmService jvmService) {
super(threshold);
this.jvmService = jvmService;
}

@Override
public boolean isOpen() {
return jvmService.stats().getMem().getHeapUsedPercent() > this.getThreshold();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.breaker;

/**
* An abstract class for all breakers with threshold.
* @param <T> data type of threshold
*/
public abstract class ThresholdCircuitBreaker<T> implements CircuitBreaker {

private T threshold;

public ThresholdCircuitBreaker(T threshold) {
this.threshold = threshold;
}

public T getThreshold() {
return threshold;
}

@Override
public abstract boolean isOpen();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.breaker;

import org.elasticsearch.monitor.jvm.JvmService;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class ADCircuitBreakerServiceTests {

@InjectMocks
private ADCircuitBreakerService adCircuitBreakerService;

@Mock
JvmService jvmService;

@Before
public void setup() {
MockitoAnnotations.initMocks(this);
}

@Test
public void testRegisterBreaker() {
adCircuitBreakerService.registerBreaker(BreakerName.MEM.getName(), new MemoryCircuitBreaker(jvmService));
CircuitBreaker breaker = adCircuitBreakerService.getBreaker(BreakerName.MEM.getName());

assertThat(breaker, is(notNullValue()));
}

@Test
public void testRegisterBreakerNull() {
CircuitBreaker breaker = adCircuitBreakerService.getBreaker(BreakerName.MEM.getName());

assertThat(breaker, is(nullValue()));
}

@Test
public void testUnregisterBreaker() {
adCircuitBreakerService.registerBreaker(BreakerName.MEM.getName(), new MemoryCircuitBreaker(jvmService));
CircuitBreaker breaker = adCircuitBreakerService.getBreaker(BreakerName.MEM.getName());
assertThat(breaker, is(notNullValue()));
adCircuitBreakerService.unregisterBreaker(BreakerName.MEM.getName());
breaker = adCircuitBreakerService.getBreaker(BreakerName.MEM.getName());
assertThat(breaker, is(nullValue()));
}

@Test
public void testUnregisterBreakerNull() {
adCircuitBreakerService.registerBreaker(BreakerName.MEM.getName(), new MemoryCircuitBreaker(jvmService));
adCircuitBreakerService.unregisterBreaker(null);
CircuitBreaker breaker = adCircuitBreakerService.getBreaker(BreakerName.MEM.getName());
assertThat(breaker, is(notNullValue()));
}

@Test
public void testClearBreakers() {
adCircuitBreakerService.registerBreaker(BreakerName.CPU.getName(), new MemoryCircuitBreaker(jvmService));
CircuitBreaker breaker = adCircuitBreakerService.getBreaker(BreakerName.CPU.getName());
assertThat(breaker, is(notNullValue()));
adCircuitBreakerService.clearBreakers();
breaker = adCircuitBreakerService.getBreaker(BreakerName.CPU.getName());
assertThat(breaker, is(nullValue()));
}

@Test
public void testInit() {
assertThat(adCircuitBreakerService.init(), is(notNullValue()));
}

}
Loading