Skip to content

Commit

Permalink
Prevent message collection from being updated after message count has…
Browse files Browse the repository at this point in the history
… been received (opensearch-project#2180)

Also adds mechanism to detect if messages were missed so tests can be updated to appropriate counts.

Signed-off-by: Peter Nied <petern@amazon.com>
(cherry picked from commit ba9d82e)
  • Loading branch information
peternied authored and DarshitChanpura committed Jul 19, 2023
1 parent e2a3a38 commit 3cbe6d5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,27 @@ public String getExceptionStackTrace() {
return (String) this.auditInfo.get(EXCEPTION);
}

@Override
public String toString() {
try {
return org.opensearch.common.Strings.toString(JsonXContent.contentBuilder().map(getAsMap()));
} catch (final IOException e) {
throw ExceptionsHelper.convertToOpenSearchException(e);
}
public String getRequestBody() {
return (String) this.auditInfo.get(REQUEST_BODY);
}

public String getNodeId() {
return (String) this.auditInfo.get(NODE_ID);
}

public String getDocId() {
return (String) this.auditInfo.get(ID);
}

@Override
public String toString() {
try {
return org.opensearch.common.Strings.toString(JsonXContent.contentBuilder().map(getAsMap()));
} catch (final IOException e) {
throw ExceptionsHelper.convertToOpenSearchException(e);
}
}

public String toPrettyString() {
try {
return org.opensearch.common.Strings.toString(JsonXContent.contentBuilder().prettyPrint().map(getAsMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.security.test.helper.rest.RestHelper.HttpResponse;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.AnyOf.anyOf;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -90,10 +92,11 @@ public void testSourceFilter() throws Exception {
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
});

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_DOC_READ"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Designation"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Salary"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Gender"));
assertThat(message.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(message.getRequestBody(), not(containsString("Designation")));
assertThat(message.getRequestBody(), not(containsString("Salary")));
assertThat(message.getRequestBody(), containsString("Gender"));

Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
}

Expand Down Expand Up @@ -223,17 +226,24 @@ public void testSourceFilterMsearch() throws Exception {
+ "}"
+ System.lineSeparator();

TestAuditlogImpl.doThenWaitForMessages(() -> {
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
HttpResponse response = rh.executePostRequest("_msearch?pretty", search, encodeBasicHeader("admin", "admin"));
assertNotContains(response, "*exception*");
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}, 2);
System.out.println(TestAuditlogImpl.sb.toString());
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_DOC_READ"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("Salary"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Gender"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("Designation"));
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));


final AuditMessage desginationMsg = messages.stream().filter(msg -> msg.getRequestBody().contains("Designation")).findFirst().orElseThrow();
assertThat(desginationMsg.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(desginationMsg.getRequestBody(), containsString("Designation"));
assertThat(desginationMsg.getRequestBody(), not(containsString("Salary")));

final AuditMessage genderMsg = messages.stream().filter(msg -> msg.getRequestBody().contains("Gender")).findFirst().orElseThrow();
assertThat(genderMsg.getCategory(), equalTo(AuditCategory.COMPLIANCE_DOC_READ));
assertThat(genderMsg.getRequestBody(), containsString("Gender"));
assertThat(genderMsg.getRequestBody(), not(containsString("Salary")));

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand All @@ -253,6 +263,7 @@ public void testInternalConfig() throws Exception {

setup(additionalSettings);

final List<String> expectedDocumentsTypes = List.of("config", "actiongroups", "internalusers", "roles", "rolesmapping", "tenants", "audit");
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
try (RestHighLevelClient restHighLevelClient = getRestClient(clusterInfo, "kirk-keystore.jks", "truststore.jks")) {
for (IndexRequest ir : new DynamicSecurityConfig().setSecurityRoles("roles_2.yml").getDynamicConfig(getResourceFolder())) {
Expand All @@ -268,23 +279,19 @@ public void testInternalConfig() throws Exception {
assertThat(response.getStatusCode(), equalTo(HttpStatus.SC_OK));
}, 14);

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_READ"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_INTERNAL_CONFIG_WRITE"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("anonymous_auth_enabled"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("indices:data/read/suggest"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("internalusers"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("opendistro_security_all_access"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("indices:data/read/suggest"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZWFyY2hndWFy"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJBTEwiOlsiaW"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJhZG1pbiI6e"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZ19hb"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("eyJzZ19hbGx"));
Assert.assertFalse(TestAuditlogImpl.sb.toString().contains("dvcmYiOnsiY2x"));
Assert.assertTrue(
TestAuditlogImpl.sb.toString().contains("\\\"op\\\":\\\"remove\\\",\\\"path\\\":\\\"/opendistro_security_worf\\\"")
);
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
final List<String> documentIds = messages.stream().map(AuditMessage::getDocId).distinct().collect(Collectors.toList());
assertThat(documentIds, equalTo(expectedDocumentsTypes));

messages.stream().collect(Collectors.groupingBy(AuditMessage::getDocId)).entrySet().forEach((e) -> {
final String docId = e.getKey();
final List<AuditMessage> messagesByDocId = e.getValue();
assertThat("Doc " + docId + " should have a read/write config message",
messagesByDocId.stream().map(AuditMessage::getCategory).collect(Collectors.toList()),
equalTo(List.of(AuditCategory.COMPLIANCE_INTERNAL_CONFIG_WRITE, AuditCategory.COMPLIANCE_INTERNAL_CONFIG_READ))
);
});

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand All @@ -301,7 +308,7 @@ public void testExternalConfig() throws Exception {
.put(ConfigConstants.OPENDISTRO_SECURITY_AUDIT_CONFIG_DISABLED_REST_CATEGORIES, "authenticated,GRANTED_PRIVILEGES")
.build();

TestAuditlogImpl.doThenWaitForMessages(() -> {
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
try {
setup(additionalSettings);
} catch (final Exception ex) {
Expand All @@ -318,10 +325,17 @@ public void testExternalConfig() throws Exception {
Assert.assertEquals(HttpStatus.SC_OK, response.getStatusCode());
}, 4);

Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("external_configuration"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("COMPLIANCE_EXTERNAL_CONFIG"));
Assert.assertTrue(TestAuditlogImpl.sb.toString().contains("opensearch_yml"));
Assert.assertTrue(validateMsgs(TestAuditlogImpl.messages));
// Record the updated config, and then for each node record that the config was updated
assertThat(messages.get(0).getCategory(), equalTo(AuditCategory.COMPLIANCE_INTERNAL_CONFIG_WRITE));
assertThat(messages.get(1).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));
assertThat(messages.get(2).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));
assertThat(messages.get(3).getCategory(), equalTo(AuditCategory.COMPLIANCE_EXTERNAL_CONFIG));

// Make sure that the config update messsages are for each node in the cluster
assertThat(messages.get(1).getNodeId(), not(equalTo(messages.get(2).getNodeId())));
assertThat(messages.get(2).getNodeId(), not(equalTo(messages.get(3).getNodeId())));

Assert.assertTrue(validateMsgs(messages));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,10 @@ public void testSSLPlainText() throws Exception {

setup(additionalSettings);
final List<AuditMessage> messages = TestAuditlogImpl.doThenWaitForMessages(() -> {
final RuntimeException ex = Assert.assertThrows(
RuntimeException.class,
() -> nonSslRestHelper().executeGetRequest("_search", encodeBasicHeader("admin", "admin"))
);
final RuntimeException ex = Assert.assertThrows(RuntimeException.class,
() -> nonSslRestHelper().executeGetRequest("_search", encodeBasicHeader("admin", "admin")));
Assert.assertEquals("org.apache.http.NoHttpResponseException", ex.getCause().getClass().getName());
}, 4);
}, 2);

// All of the messages should be the same as the http client is attempting multiple times.
messages.stream().forEach((message) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,66 @@ public static synchronized void clear() {
* Perform an action and then wait until the expected number of messages have been found.
*/
public static List<AuditMessage> doThenWaitForMessages(final Runnable action, final int expectedCount) {
final CountDownLatch latch = new CountDownLatch(expectedCount);
final List<AuditMessage> missedMessages = new ArrayList<>();
final List<AuditMessage> messages = new ArrayList<>();
countDownRef.set(latch);
messagesRef.set(messages);

TestAuditlogImpl.sb = new StringBuffer();
TestAuditlogImpl.messages = messages;

final CountDownLatch latch = resetAuditStorage(expectedCount, messages);

try {
action.run();
final int maxSecondsToWaitForMessages = 1;
final boolean foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
if (!foundAll) {
boolean foundAll = false;
foundAll = latch.await(maxSecondsToWaitForMessages, TimeUnit.SECONDS);
// After the wait has prevent any new messages from being recieved
resetAuditStorage(0, missedMessages);
if (!foundAll || messages.size() != expectedCount) {
throw new MessagesNotFoundException(expectedCount, messages);
}
if (messages.size() != expectedCount) {
throw new RuntimeException(
"Unexpected number of messages, was expecting " + expectedCount + ", received " + messages.size()
);
}
} catch (final InterruptedException e) {
throw new RuntimeException("Unexpected exception", e);
}

// Do not check for missed messages if no messages were expected
if (expectedCount != 0) {
try {
Thread.sleep(100);
if (missedMessages.size() != 0) {
final String missedMessagesErrorMessage = new StringBuilder()
.append("Audit messages were missed! ")
.append("Found " + (missedMessages.size()) + " messages.")
.append("Messages found during this time: \n\n")
.append(missedMessages.stream()
.map(AuditMessage::toString)
.collect(Collectors.joining("\n")))
.toString();

throw new RuntimeException(missedMessagesErrorMessage);
}
} catch (final Exception e) {
throw new RuntimeException("Unexpected exception", e);
}
}

// Next usage of this class might be using raw stringbuilder / list so reset before that test might run
resetAuditStorage(0, new ArrayList<>());
return new ArrayList<>(messages);
}

/**
* Resets all of the mechanics for fresh messages to be captured
*
* @param expectedMessageCount The number of messages before the latch is signalled, indicating all messages have been recieved
* @param message Where messages will be stored after being recieved
*/
private static CountDownLatch resetAuditStorage(int expectedMessageCount, List<AuditMessage> messages) {
final CountDownLatch latch = new CountDownLatch(expectedMessageCount);
countDownRef.set(latch);
messagesRef.set(messages);

TestAuditlogImpl.sb = new StringBuffer();
TestAuditlogImpl.messages = messages;
return latch;
}

/**
* Perform an action and then wait until a single message has been found.
*/
Expand Down

0 comments on commit 3cbe6d5

Please sign in to comment.