Skip to content

Commit

Permalink
[improve] [client]Add new ServiceUrlProvider implementation: SameAuth…
Browse files Browse the repository at this point in the history
…ParamsAutoClusterFailover (#23129)
  • Loading branch information
poorbarcode committed Aug 12, 2024
1 parent 2dde403 commit 06a2f5c
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient;
import static org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState;
import io.netty.channel.EventLoopGroup;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.NetworkErrorTestBase;
import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class SameAuthParamsLookupAutoClusterFailoverTest extends OneWayReplicatorTestBase {

public void setup() throws Exception {
super.setup();
}

@Override
@AfterMethod(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

@DataProvider(name = "enabledTls")
public Object[][] enabledTls () {
return new Object[][] {
{true},
{false}
};
}

@Test(dataProvider = "enabledTls", timeOut = 240 * 1000)
public void testAutoClusterFailover(boolean enabledTls) throws Exception {
// Start clusters.
setup();
ServerSocket dummyServer = new ServerSocket(NetworkErrorTestBase.getOneFreePort());

// Initialize client.
String urlProxy = enabledTls ? "pulsar+tls://127.0.0.1:" + dummyServer.getLocalPort()
: "pulsar://127.0.0.1:" + dummyServer.getLocalPort();
String url1 = enabledTls ? pulsar1.getBrokerServiceUrlTls() : pulsar1.getBrokerServiceUrl();
String url2 = enabledTls ? pulsar2.getBrokerServiceUrlTls() : pulsar2.getBrokerServiceUrl();
final String[] urlArray = new String[]{url1, urlProxy, url2};
final SameAuthParamsLookupAutoClusterFailover failover = SameAuthParamsLookupAutoClusterFailover.builder()
.pulsarServiceUrlArray(urlArray)
.failoverThreshold(5)
.recoverThreshold(5)
.checkHealthyIntervalMs(300)
.testTopic("a/b/c")
.markTopicNotFoundAsAvailable(true)
.build();
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrlProvider(failover);
if (enabledTls) {
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", getTlsFileForClient("admin.cert"));
authParams.put("tlsKeyFile", getTlsFileForClient("admin.key-pk8"));
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams)
.enableTls(true)
.allowTlsInsecureConnection(false)
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH);
}
final PulsarClient client = clientBuilder.build();
failover.initialize(client);
final EventLoopGroup executor = WhiteboxImpl.getInternalState(failover, "executor");
final PulsarServiceState[] stateArray =
WhiteboxImpl.getInternalState(failover, "pulsarServiceStateArray");

// Test all things is fine.
final String tp = BrokerTestUtil.newUniqueName(nonReplicatedNamespace + "/tp");
final Producer<String> producer = client.newProducer(Schema.STRING).topic(tp).create();
producer.send("0");
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);

CompletableFuture<Boolean> checkStatesFuture1 = new CompletableFuture<>();
executor.submit(() -> {
boolean res = stateArray[0] == PulsarServiceState.Healthy;
res = res & stateArray[1] == PulsarServiceState.Healthy;
res = res & stateArray[2] == PulsarServiceState.Healthy;
checkStatesFuture1.complete(res);
});
Assert.assertTrue(checkStatesFuture1.join());

// Test failover 0 --> 3.
pulsar1.close();
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<Boolean> checkStatesFuture2 = new CompletableFuture<>();
executor.submit(() -> {
boolean res = stateArray[0] == PulsarServiceState.Failed;
res = res & stateArray[1] == PulsarServiceState.Failed;
res = res & stateArray[2] == PulsarServiceState.Healthy;
checkStatesFuture2.complete(res);
});
Assert.assertTrue(checkStatesFuture2.join());
producer.send("0->2");
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
});

// Test recover 2 --> 1.
executor.execute(() -> {
urlArray[1] = url2;
});
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<Boolean> checkStatesFuture3 = new CompletableFuture<>();
executor.submit(() -> {
boolean res = stateArray[0] == PulsarServiceState.Failed;
res = res & stateArray[1] == PulsarServiceState.Healthy;
res = res & stateArray[2] == PulsarServiceState.Healthy;
checkStatesFuture3.complete(res);
});
Assert.assertTrue(checkStatesFuture3.join());
producer.send("2->1");
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1);
});

// Test recover 1 --> 0.
executor.execute(() -> {
urlArray[0] = url2;
});
Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<Boolean> checkStatesFuture4 = new CompletableFuture<>();
executor.submit(() -> {
boolean res = stateArray[0] == PulsarServiceState.Healthy;
res = res & stateArray[1] == PulsarServiceState.Healthy;
res = res & stateArray[2] == PulsarServiceState.Healthy;
checkStatesFuture4.complete(res);
});
Assert.assertTrue(checkStatesFuture4.join());
producer.send("1->0");
Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
});

// cleanup.
producer.close();
client.close();
dummyServer.close();
}

@Override
protected void cleanupPulsarResources() {
// Nothing to do.
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
// All certificate-authority files are copied from the tests/certificate-authority directory and all share the same
// root CA.
protected static String getTlsFileForClient(String name) {
public static String getTlsFileForClient(String name) {
return ResourceUtils.getAbsolutePath(String.format("certificate-authority/client-keys/%s.pem", name));
}
public final static String CA_CERT_FILE_PATH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected void startBrokers() throws Exception {
log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(), broker2.getListenPort());
}

protected int getOneFreePort() throws IOException {
public static int getOneFreePort() throws IOException {
ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
serverSocket.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_CERT_FILE_PATH;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_KEY_FILE_PATH;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -267,10 +270,18 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
config.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
config.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
config.setClusterName(clusterName);
config.setTlsRequireTrustedClientCertOnConnect(false);
Set<String> tlsProtocols = Sets.newConcurrentHashSet();
tlsProtocols.add("TLSv1.3");
tlsProtocols.add("TLSv1.2");
config.setTlsProtocols(tlsProtocols);
}

@Override
protected void cleanup() throws Exception {
protected void cleanupPulsarResources() throws Exception {
// delete namespaces.
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1));
Expand All @@ -283,6 +294,12 @@ protected void cleanup() throws Exception {
admin2.namespaces().deleteNamespace(replicatedNamespace, true);
admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
}
}

@Override
protected void cleanup() throws Exception {
// cleanup pulsar resources.
cleanupPulsarResources();

// shutdown.
markCurrentSetupNumberCleaned();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand All @@ -46,10 +47,12 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -542,18 +545,34 @@ public void testReaderInitAtDeletedPosition() throws Exception {
.getStats(topicName, true, true, true).getSubscriptions().get("s1");
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get("s1");
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
ManagedLedgerInternalStats.CursorStats cursorStats = internalStats.cursors.get("s1");
String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
Position actMarkDeletedPos =
PositionFactory.create(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
Position expectedMarkDeletedPos =
PositionFactory.create(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
ImmutablePositionImpl actMarkDeletedPos =
new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
ImmutablePositionImpl expectedMarkDeletedPos =
new ImmutablePositionImpl(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
log.info("LAC: {}", internalStats.lastConfirmedEntry);
log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
});

admin.topics().createSubscription(topicName, "s2", MessageId.earliest);
admin.topics().createSubscription(topicName, "s3", MessageId.latest);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
ManagedLedgerInternalStats.CursorStats cursorStats2 = internalStats.cursors.get("s2");
String[] ledgerIdAndEntryId2 = cursorStats2.markDeletePosition.split(":");
ImmutablePositionImpl actMarkDeletedPos2 =
new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId2[0]), Long.valueOf(ledgerIdAndEntryId2[1]));
ManagedLedgerInternalStats.CursorStats cursorStats3 = internalStats.cursors.get("s3");
String[] ledgerIdAndEntryId3 = cursorStats3.markDeletePosition.split(":");
ImmutablePositionImpl actMarkDeletedPos3 =
new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId3[0]), Long.valueOf(ledgerIdAndEntryId3[1]));
log.info("LAC: {}", internalStats.lastConfirmedEntry);
log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2);
log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3);
pulsar.getBrokerService().getTopic(topicName, false).join().get();
// cleanup.
reader.close();
producer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface ServiceUrlProvider extends AutoCloseable {
*
*/
@Override
default void close() {
default void close() throws Exception {
// do nothing
}
}
Loading

0 comments on commit 06a2f5c

Please sign in to comment.