Skip to content

Commit

Permalink
[Backport 2.x] Prevent message collection from being updated after me…
Browse files Browse the repository at this point in the history
…ssage count has been received (#2180) (#3035)

* Prevent message collection from being updated after message count has been received (#2180)

Also adds mechanism to detect if messages were missed so tests can be updated to appropriate counts.

Signed-off-by: Peter Nied <petern@amazon.com>
(cherry picked from commit ba9d82e)

* Fixes failing citest task

Signed-off-by: Darshit Chanpura <dchanp@amazon.com>

* Fixes spotlessChecks

Signed-off-by: Darshit Chanpura <dchanp@amazon.com>

* Fixes test assertions to reflect correct number

Signed-off-by: Darshit Chanpura <dchanp@amazon.com>

---------

Signed-off-by: Darshit Chanpura <dchanp@amazon.com>
Co-authored-by: Peter Nied <petern@amazon.com>
Co-authored-by: Craig Perkins <cwperx@amazon.com>
  • Loading branch information
3 people committed Jul 21, 2023
1 parent a0ac52b commit 89df7b4
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@ public String getExceptionStackTrace() {
return (String) this.auditInfo.get(EXCEPTION);
}

public String getRequestBody() {
return (String) this.auditInfo.get(REQUEST_BODY);
}

public String getNodeId() {
return (String) this.auditInfo.get(NODE_ID);
}

public String getDocId() {
return (String) this.auditInfo.get(ID);
}

@Override
public String toString() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.security.test.helper.rest.RestHelper.HttpResponse;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.AnyOf.anyOf;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -90,10 +92,11 @@ public void testSourceFilter() throws Exception {
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
});

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_DOC_READ"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Designation"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Salary"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Gender"));
assertThat(message.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(message.getRequestBody(), not(containsString("Designation")));
assertThat(message.getRequestBody(), not(containsString("Salary")));
assertThat(message.getRequestBody(), containsString("Gender"));

Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
}

Expand Down Expand Up @@ -223,17 +226,26 @@ public void testSourceFilterMsearch() throws Exception {
+ "}"
+ System.lineSeparator();

TestAuditlogImpl.doThenWaitForMessages(() -> {
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
HttpResponse response = rh.executePostRequest("_msearch?pretty", search, encodeBasicHeader("admin", "admin"));
assertNotContains(response, "*exception*");
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}, 2);
System.out.println(TestAuditlogImpl.sb.toString());
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_DOC_READ"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Salary"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Gender"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Designation"));
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));

final AuditMessage desginationMsg = messages.stream()
.filter(msg -> msg.getRequestBody().contains("Designation"))
.findFirst()
.orElseThrow();
assertThat(desginationMsg.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(desginationMsg.getRequestBody(), containsString("Designation"));
assertThat(desginationMsg.getRequestBody(), not(containsString("Salary")));

final AuditMessage genderMsg = messages.stream().filter(msg -> msg.getRequestBody().contains("Gender")).findFirst().orElseThrow();
assertThat(genderMsg.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(genderMsg.getRequestBody(), containsString("Gender"));
assertThat(genderMsg.getRequestBody(), not(containsString("Salary")));

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand All @@ -253,6 +265,15 @@ public void testInternalConfig() throws Exception {

setup(additionalSettings);

final List<String> expectedDocumentsTypes = List.of(
"config",
"actiongroups",
"internalusers",
"roles",
"rolesmapping",
"tenants",
"audit"
);
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
try (RestHighLevelClient restHighLevelClient = getRestClient(clusterInfo, "kirk-keystore.jks", "truststore.jks")) {
for (IndexRequest ir : new DynamicSecurityConfig().setSecurityRoles("roles_2.yml").getDynamicConfig(getResourceFolder())) {
Expand All @@ -268,23 +289,20 @@ public void testInternalConfig() throws Exception {
assertThat(response.getStatusCode(), equalTo(HttpStatus.SC_OK));
}, 14);

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_READ"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_WRITE"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("anonymous_auth_enabled"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("indices:data/read/suggest"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("internalusers"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("opendistro_security_all_access"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("indices:data/read/suggest"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZWFyY2hndWFy"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJBTEwiOlsiaW"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJhZG1pbiI6e"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZ19hb"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZ19hbGx"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("dvcmYiOnsiY2x"));
Assert.assertTrue(
TestAuditlogImpl.sb.toString().contains("\\\"op\\\":\\\"remove\\\",\\\"path\\\":\\\"/opendistro_security_worf\\\"")
);
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
final List<String> documentIds = messages.stream().map(AuditMessage::getDocId).distinct().collect(Collectors.toList());
assertThat(documentIds, equalTo(expectedDocumentsTypes));

messages.stream().collect(Collectors.groupingBy(AuditMessage::getDocId)).entrySet().forEach((e) -> {
final String docId = e.getKey();
final List<AuditMessage> messagesByDocId = e.getValue();
assertThat(
"Doc " + docId + " should have a read/write config message",
messagesByDocId.stream().map(AuditMessage::getCategory).collect(Collectors.toList()),
equalTo(List.of(AuditCategory.COMPLIANCE_INTERNAL_CONFIG_WRITE, AuditCategory.COMPLIANCE_INTERNAL_CONFIG_READ))
);
});

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand All @@ -301,7 +319,7 @@ public void testExternalConfig() throws Exception {
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_CONFIG_DISABLED_REST_CATEGORIES, "authenticated,GRANTED_PRIVILEGES")
.build();

TestAuditlogImpl.doThenWaitForMessages(() -> {
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
try {
setup(additionalSettings);
} catch (final Exception ex) {
Expand All @@ -318,10 +336,17 @@ public void testExternalConfig() throws Exception {
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}, 4);

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("external_configuration"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_EXTERNAL_CONFIG"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("opensearch_yml"));
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
// Record the updated config, and then for each node record that the config was updated
assertThat(messages.get(0).getCategory(), equalTo(AuditCategory.COMPLIANCE_INTERNAL_CONFIG_WRITE));
assertThat(messages.get(1).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));
assertThat(messages.get(2).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));
assertThat(messages.get(3).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));

// Make sure that the config update messsages are for each node in the cluster
assertThat(messages.get(1).getNodeId(), not(equalTo(messages.get(2).getNodeId())));
assertThat(messages.get(2).getNodeId(), not(equalTo(messages.get(3).getNodeId())));

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testSSLPlainText() throws Exception {
() -> nonSslRestHelper().executeGetRequest("_search", encodeBasicHeader("admin", "admin"))
);
Assert.assertEquals("org.apache.http.NoHttpResponseException", ex.getCause().getClass().getName());
}, 4);
}, 1);

// All of the messages should be the same as the http client is attempting multiple times.
messages.stream().forEach((message) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,63 @@ public static synchronized void clear() {
* Perform an action and then wait until the expected number of messages have been found.
*/
public static List<AuditMessage> doThenWaitForMessages(final Runnable action, final int expectedCount) {
final CountDownLatch latch = new CountDownLatch(expectedCount);
final List<AuditMessage> missedMessages = new ArrayList<>();
final List<AuditMessage> messages = new ArrayList<>();
countDownRef.set(latch);
messagesRef.set(messages);

TestAuditlogImpl.sb = new StringBuffer();
TestAuditlogImpl.messages = messages;
final CountDownLatch latch = resetAuditStorage(expectedCount, messages);

try {
action.run();
final int maxSecondsToWaitForMessages = 1;
final boolean foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
if (!foundAll) {
boolean foundAll = false;
foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
// After the wait has prevent any new messages from being recieved
resetAuditStorage(0, missedMessages);
if (!foundAll || messages.size() != expectedCount) {
throw new MessagesNotFoundException(expectedCount, messages);
}
if (messages.size() != expectedCount) {
throw new RuntimeException(
"Unexpected number of messages, was expecting " + expectedCount + ", received " + messages.size()
);
}
} catch (final InterruptedException e) {
throw new RuntimeException("Unexpected exception", e);
}

// Do not check for missed messages if no messages were expected
if (expectedCount != 0) {
try {
Thread.sleep(100);
if (missedMessages.size() != 0) {
final String missedMessagesErrorMessage = new StringBuilder().append("Audit messages were missed! ")
.append("Found " + (missedMessages.size()) + " messages.")
.append("Messages found during this time: \n\n")
.append(missedMessages.stream().map(AuditMessage::toString).collect(Collectors.joining("\n")))
.toString();

throw new RuntimeException(missedMessagesErrorMessage);
}
} catch (final Exception e) {
throw new RuntimeException("Unexpected exception", e);
}
}

// Next usage of this class might be using raw stringbuilder / list so reset before that test might run
resetAuditStorage(0, new ArrayList<>());
return new ArrayList<>(messages);
}

/**
* Resets all of the mechanics for fresh messages to be captured
*
* @param expectedMessageCount The number of messages before the latch is signalled, indicating all messages have been recieved
* @param message Where messages will be stored after being recieved
*/
private static CountDownLatch resetAuditStorage(int expectedMessageCount, List<AuditMessage> messages) {
final CountDownLatch latch = new CountDownLatch(expectedMessageCount);
countDownRef.set(latch);
messagesRef.set(messages);

TestAuditlogImpl.sb = new StringBuffer();
TestAuditlogImpl.messages = messages;
return latch;
}

/**
* Perform an action and then wait until a single message has been found.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ protected final CloseableHttpClient getHTTPClient() throws Exception {

hcb.setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(60 * 1000).build());

return hcb.build();
return hcb.disableAutomaticRetries().build();
}

public static class HttpResponse {
Expand Down

0 comments on commit 89df7b4

Please sign in to comment.