Skip to content

Commit

Permalink
Adds setting to enable/disable Thread Contention Monitoring (#171)
Browse files Browse the repository at this point in the history
* adds setting to enable/disable thread contention monitoring

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix log messages

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Apr 11, 2022
1 parent a65ba5c commit 908218e
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ public class PerformanceAnalyzerController {
// This file should contain "true" or "false", indicating whether batch metrics is currently
// enabled or not.
private static final String BATCH_METRICS_ENABLED_CONF = "batch_metrics_enabled.conf";
private static final String THREAD_CONTENTION_MONITORING_ENABLED_CONF = "thread_contention_monitoring_enabled.conf";
private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerController.class);
public static final int DEFAULT_NUM_OF_SHARDS_PER_COLLECTION = 0;

private boolean paEnabled;
private boolean rcaEnabled;
private boolean loggingEnabled;
private boolean batchMetricsEnabled;
private boolean threadContentionMonitoringEnabled;
private volatile int shardsPerCollection;
private static final boolean paEnabledDefaultValue = false;
private static final boolean rcaEnabledDefaultValue = true;
private static final boolean loggingEnabledDefaultValue = false;
private static final boolean batchMetricsEnabledDefaultValue = false;
private static final boolean threadContentionMonitoringEnabledDefaultValue = false;
private final ScheduledMetricCollectorsExecutor scheduledMetricCollectorsExecutor;

public PerformanceAnalyzerController(
Expand All @@ -52,6 +55,7 @@ public PerformanceAnalyzerController(
initRcaStateFromConf();
initLoggingStateFromConf();
initBatchMetricsStateFromConf();
initThreadContentionMonitoringStateFromConf();
shardsPerCollection = DEFAULT_NUM_OF_SHARDS_PER_COLLECTION;
}

Expand Down Expand Up @@ -88,6 +92,10 @@ public boolean isBatchMetricsEnabled() {
return batchMetricsEnabled;
}

public boolean isThreadContentionMonitoringEnabled() {
return threadContentionMonitoringEnabled;
}

/**
* Reads the shardsPerCollection parameter in NodeStatsMetric
*
Expand Down Expand Up @@ -166,6 +174,23 @@ public void updateBatchMetricsState(final boolean shouldEnable) {
saveStateToConf(this.batchMetricsEnabled, BATCH_METRICS_ENABLED_CONF);
}

/**
* Updates the state of the thread contention monitoring api.
*
* @param shouldEnable The desired state of the thread contention monitoring api. False to disable, and true to
* enable.
*/
public void updateThreadContentionMonitoringState(final boolean shouldEnable) {
if (shouldEnable && !isPerformanceAnalyzerEnabled()) {
return;
}
this.threadContentionMonitoringEnabled = shouldEnable;
if (scheduledMetricCollectorsExecutor != null) {
scheduledMetricCollectorsExecutor.setThreadContentionMonitoringEnabled(threadContentionMonitoringEnabled);
}
saveStateToConf(this.threadContentionMonitoringEnabled, THREAD_CONTENTION_MONITORING_ENABLED_CONF);
}

private void initPerformanceAnalyzerStateFromConf() {
Path filePath = Paths.get(getDataDirectory(), PERFORMANCE_ANALYZER_ENABLED_CONF);
PerformanceAnalyzerPlugin.invokePrivileged(
Expand Down Expand Up @@ -249,6 +274,29 @@ private void initBatchMetricsStateFromConf() {
});
}

private void initThreadContentionMonitoringStateFromConf() {
Path filePath = Paths.get(getDataDirectory(), THREAD_CONTENTION_MONITORING_ENABLED_CONF);
PerformanceAnalyzerPlugin.invokePrivileged(
() -> {
boolean threadContentionMonitoringEnabledFromConf;
try {
threadContentionMonitoringEnabledFromConf = readBooleanFromFile(filePath);
} catch (NoSuchFileException e) {
LOG.debug("Error reading Performance Analyzer state from Conf file", e);
saveStateToConf(
threadContentionMonitoringEnabledDefaultValue, THREAD_CONTENTION_MONITORING_ENABLED_CONF);
threadContentionMonitoringEnabledFromConf = threadContentionMonitoringEnabledDefaultValue;
} catch (Exception e) {
LOG.debug("Error reading thread contention monitoring state from Conf file", e);
threadContentionMonitoringEnabledFromConf = threadContentionMonitoringEnabledDefaultValue;
}

// For thread contention monitoring to be enabled, both PA and thread contention monitoring
// need to enabled.
updateThreadContentionMonitoringState(paEnabled && threadContentionMonitoringEnabledFromConf);
});
}

private boolean readBooleanFromFile(final Path filePath) throws Exception {
try (Scanner sc = new Scanner(filePath)) {
String nextLine = sc.nextLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public final class PerformanceAnalyzerClusterSettings {
/**
* Cluster setting that controls state for various PA components. Bit 0: Perf Analyzer
* enabled/disabled Bit 1: RCA enabled/disabled Bit 2: Logging enabled/disabled Bit 3: Batch
* Metrics enabled/disabled
* Metrics enabled/disabled Bit 4: Thread Contention Monitoring enabled/disabled
*/
public static final Setting<Integer> COMPOSITE_PA_SETTING =
Setting.intSetting(
Expand All @@ -25,7 +25,8 @@ public enum PerformanceAnalyzerFeatureBits {
PA_BIT,
RCA_BIT,
LOGGING_BIT,
BATCH_METRICS_BIT
BATCH_METRICS_BIT,
THREAD_CONTENTION_MONITORING_BIT
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class PerformanceAnalyzerClusterSettingHandler implements ClusterSettingL
private static final int BATCH_METRICS_ENABLED_BIT_POS =
PerformanceAnalyzerClusterSettings.PerformanceAnalyzerFeatureBits.BATCH_METRICS_BIT
.ordinal();
private static final int THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS =
PerformanceAnalyzerClusterSettings.PerformanceAnalyzerFeatureBits.THREAD_CONTENTION_MONITORING_BIT
.ordinal();

private final PerformanceAnalyzerController controller;
private final ClusterSettingsManager clusterSettingsManager;
Expand All @@ -45,7 +48,8 @@ public PerformanceAnalyzerClusterSettingHandler(
controller.isPerformanceAnalyzerEnabled(),
controller.isRcaEnabled(),
controller.isLoggingEnabled(),
controller.isBatchMetricsEnabled());
controller.isBatchMetricsEnabled(),
controller.isThreadContentionMonitoringEnabled());
}

/**
Expand Down Expand Up @@ -92,6 +96,17 @@ public void updateBatchMetricsSetting(final boolean state) {
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING, settingIntValue);
}

/**
* Updates the Thread Contention Monitoring setting across the cluster.
*
* @param state The desired state for Thread Contention Monitoring setting.
*/
public void updateThreadContentionMonitoringSetting(final boolean state) {
final Integer settingIntValue = getThreadContentionMonitoringSettingValueFromState(state);
clusterSettingsManager.updateSetting(
PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING, settingIntValue);
}

/**
* Handler that gets called when there is a new value for the setting that this listener is
* listening to.
Expand All @@ -106,6 +121,7 @@ public void onSettingUpdate(final Integer newSettingValue) {
controller.updateRcaState(getRcaStateFromSetting(newSettingValue));
controller.updateLoggingState(getLoggingStateFromSetting(newSettingValue));
controller.updateBatchMetricsState(getBatchMetricsStateFromSetting(newSettingValue));
controller.updateThreadContentionMonitoringState(getThreadContentionMonitoringStateFromSetting(newSettingValue));
}
}

Expand All @@ -130,7 +146,8 @@ private Integer initializeClusterSettingValue(
final boolean paEnabled,
final boolean rcaEnabled,
final boolean loggingEnabled,
final boolean batchMetricsEnabled) {
final boolean batchMetricsEnabled,
final boolean threadContentionMonitoringEnabled) {
int clusterSetting = CLUSTER_SETTING_DISABLED_VALUE;

clusterSetting = paEnabled ? setBit(clusterSetting, PA_ENABLED_BIT_POS) : clusterSetting;
Expand All @@ -145,6 +162,10 @@ private Integer initializeClusterSettingValue(
batchMetricsEnabled
? setBit(clusterSetting, BATCH_METRICS_ENABLED_BIT_POS)
: clusterSetting;
clusterSetting =
threadContentionMonitoringEnabled
? setBit(clusterSetting, THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS)
: clusterSetting;
}
return clusterSetting;
}
Expand All @@ -161,10 +182,10 @@ private boolean getPAStateFromSetting(final int settingValue) {

/**
* Converts the boolean PA state to composite cluster setting. If Performance Analyzer is being
* turned off, it will also turn off RCA, logging, and batch metrics.
* turned off, it will also turn off RCA, logging, batch metrics and thread contention monitoring.
*
* @param state the state of performance analyzer. Will enable performance analyzer if true,
* disables performance analyzer, RCA, logging, and batch metrics.
* disables performance analyzer, RCA, logging, batch metrics and thread contention monitoring.
* @return composite cluster setting as an integer.
*/
private Integer getPASettingValueFromState(final boolean state) {
Expand All @@ -176,10 +197,12 @@ private Integer getPASettingValueFromState(final boolean state) {
return resetBit(
resetBit(
resetBit(
resetBit(clusterSetting, PA_ENABLED_BIT_POS),
RCA_ENABLED_BIT_POS),
LOGGING_ENABLED_BIT_POS),
BATCH_METRICS_ENABLED_BIT_POS);
resetBit(
resetBit(clusterSetting, PA_ENABLED_BIT_POS),
RCA_ENABLED_BIT_POS),
LOGGING_ENABLED_BIT_POS),
BATCH_METRICS_ENABLED_BIT_POS),
THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS);
}
}

Expand Down Expand Up @@ -213,6 +236,16 @@ private boolean getBatchMetricsStateFromSetting(final int settingValue) {
return ((settingValue >> BATCH_METRICS_ENABLED_BIT_POS) & BIT_ONE) == ENABLED_VALUE;
}

/**
* Extracts the boolean value for thread contention monitoring state from the cluster setting.
*
* @param settingValue The composite setting value.
* @return true if the THREAD_CONTENTION_MONITORING bit is set, false otherwise.
*/
private boolean getThreadContentionMonitoringStateFromSetting(final int settingValue) {
return ((settingValue >> THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS) & BIT_ONE) == ENABLED_VALUE;
}

/**
* Converts the boolean RCA state to composite cluster setting. Enables RCA only if performance
* analyzer is also set. Otherwise, results in a no-op.
Expand Down Expand Up @@ -272,6 +305,26 @@ private Integer getBatchMetricsSettingValueFromState(final boolean shouldEnable)
}
}

/**
* Converts the boolean thread contention monitoring state to composite cluster setting. Enables thread contention
* monitoring only if performance analyzer is also set. Otherwise, results in a no-op.
*
* @param shouldEnable the state of thread contention monitoring. Will try to enable if true, disables thread
* contention monitoring if false.
* @return composite cluster setting as an integer.
*/
private Integer getThreadContentionMonitoringSettingValueFromState(final boolean shouldEnable) {
int clusterSetting = currentClusterSetting;

if (shouldEnable) {
return checkBit(currentClusterSetting, PA_ENABLED_BIT_POS)
? setBit(clusterSetting, THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS)
: clusterSetting;
} else {
return resetBit(clusterSetting, THREAD_CONTENTION_MONITORING_ENABLED_BIT_POS);
}
}

/**
* Sets the bit at the specified position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class PerformanceAnalyzerClusterConfigAction extends BaseRestHandler {
RestConfig.PA_BASE_URI + "/logging/cluster/config";
public static final String BATCH_METRICS_CLUSTER_CONFIG_PATH =
RestConfig.PA_BASE_URI + "/batch/cluster/config";
public static final String THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH =
RestConfig.PA_BASE_URI + "/threadContentionMonitoring/cluster/config";

public static final String LEGACY_PA_CLUSTER_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/cluster/config";
Expand All @@ -59,6 +61,16 @@ public class PerformanceAnalyzerClusterConfigAction extends BaseRestHandler {
public static final String LEGACY_BATCH_METRICS_CLUSTER_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/batch/cluster/config";

private static final List<Route> ROUTES =
unmodifiableList(
asList(
new Route(
RestRequest.Method.GET,
THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH),
new Route(
RestRequest.Method.POST,
THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH)
));
private static final List<ReplacedRoute> REPLACED_ROUTES =
unmodifiableList(
asList(
Expand Down Expand Up @@ -126,7 +138,7 @@ public String getName() {

@Override
public List<Route> routes() {
return Collections.emptyList();
return ROUTES;
}

@Override
Expand Down Expand Up @@ -169,6 +181,8 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
} else if (request.path().contains(BATCH_METRICS_CLUSTER_CONFIG_PATH)
|| request.path().contains(LEGACY_BATCH_METRICS_CLUSTER_CONFIG_PATH)) {
clusterSettingHandler.updateBatchMetricsSetting((Boolean) value);
} else if (request.path().contains(THREAD_CONTENTION_MONITORING_CLUSTER_CONFIG_PATH)) {
clusterSettingHandler.updateThreadContentionMonitoringSetting((Boolean) value);
} else {
clusterSettingHandler.updatePerformanceAnalyzerSetting((Boolean) value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class PerformanceAnalyzerConfigAction extends BaseRestHandler {
public static final String RCA_ENABLED = "rcaEnabled";
public static final String PA_LOGGING_ENABLED = "loggingEnabled";
public static final String BATCH_METRICS_ENABLED = "batchMetricsEnabled";
public static final String THREAD_CONTENTION_MONITORING_ENABLED = "threadContentionMonitoringEnabled";
public static final String BATCH_METRICS_RETENTION_PERIOD_MINUTES =
"batchMetricsRetentionPeriodMinutes";
public static final String PERFORMANCE_ANALYZER_CONFIG_ACTION =
Expand All @@ -48,6 +49,8 @@ public class PerformanceAnalyzerConfigAction extends BaseRestHandler {
public static final String PA_CONFIG_PATH = RestConfig.PA_BASE_URI + "/config";
public static final String LOGGING_CONFIG_PATH = RestConfig.PA_BASE_URI + "/logging/config";
public static final String BATCH_METRICS_CONFIG_PATH = RestConfig.PA_BASE_URI + "/batch/config";
public static final String THREAD_CONTENTION_MONITORING_CONFIG_PATH =
RestConfig.PA_BASE_URI + "/threadContentionMonitoring/config";

public static final String LEGACY_RCA_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/rca/config";
Expand All @@ -56,7 +59,16 @@ public class PerformanceAnalyzerConfigAction extends BaseRestHandler {
RestConfig.LEGACY_PA_BASE_URI + "/logging/config";
public static final String LEGACY_BATCH_METRICS_CONFIG_PATH =
RestConfig.LEGACY_PA_BASE_URI + "/batch/config";

private static final List<Route> ROUTES =
unmodifiableList(
asList(
new Route(
RestRequest.Method.GET,
THREAD_CONTENTION_MONITORING_CONFIG_PATH),
new Route(
RestRequest.Method.POST,
THREAD_CONTENTION_MONITORING_CONFIG_PATH)
));
private static final List<ReplacedRoute> REPLACED_ROUTES =
unmodifiableList(
asList(
Expand Down Expand Up @@ -112,7 +124,7 @@ public static void setInstance(

@Override
public List<Route> routes() {
return Collections.emptyList();
return ROUTES;
}

@Override
Expand Down Expand Up @@ -172,12 +184,20 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
}

performanceAnalyzerController.updateBatchMetricsState(shouldEnable);
} else if (request.path().contains(THREAD_CONTENTION_MONITORING_CONFIG_PATH)) {
if (shouldEnable
&& !performanceAnalyzerController.isPerformanceAnalyzerEnabled()) {
return getChannelConsumerWithError(
"Error: PA not enabled. Enable PA before turning thread contention monitoring on");
}
performanceAnalyzerController.updateThreadContentionMonitoringState(shouldEnable);
} else {
// Disabling Performance Analyzer should disable the RCA framework as well.
if (!shouldEnable) {
performanceAnalyzerController.updateRcaState(false);
performanceAnalyzerController.updateLoggingState(false);
performanceAnalyzerController.updateBatchMetricsState(false);
performanceAnalyzerController.updateThreadContentionMonitoringState(false);
}
performanceAnalyzerController.updatePerformanceAnalyzerState(shouldEnable);
}
Expand Down Expand Up @@ -206,6 +226,9 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
builder.field(
BATCH_METRICS_ENABLED,
performanceAnalyzerController.isBatchMetricsEnabled());
builder.field(
THREAD_CONTENTION_MONITORING_ENABLED,
performanceAnalyzerController.isThreadContentionMonitoringEnabled());
builder.field(
BATCH_METRICS_RETENTION_PERIOD_MINUTES,
PluginSettings.instance().getBatchMetricsRetentionPeriodMinutes());
Expand Down
Loading

0 comments on commit 908218e

Please sign in to comment.