From 06a2f5cc63c126a0262bec4602b81c1c716e4e36 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 13 Aug 2024 00:47:04 +0800 Subject: [PATCH] [improve] [client]Add new ServiceUrlProvider implementation: SameAuthParamsAutoClusterFailover (#23129) --- ...thParamsLookupAutoClusterFailoverTest.java | 176 +++++++++ .../auth/MockedPulsarServiceBaseTest.java | 2 +- .../broker/service/NetworkErrorTestBase.java | 2 +- .../service/OneWayReplicatorTestBase.java | 21 +- .../api/NonDurableSubscriptionTest.java | 33 +- .../pulsar/client/api/ServiceUrlProvider.java | 2 +- ...meAuthParamsLookupAutoClusterFailover.java | 341 ++++++++++++++++++ .../client/impl/AutoClusterFailoverTest.java | 12 +- .../impl/ControlledClusterFailoverTest.java | 5 +- 9 files changed, 572 insertions(+), 22 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java new file mode 100644 index 0000000000000..b39f8135e0e0c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient; +import static org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState; +import io.netty.channel.EventLoopGroup; +import java.net.ServerSocket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.NetworkErrorTestBase; +import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class SameAuthParamsLookupAutoClusterFailoverTest extends OneWayReplicatorTestBase { + + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterMethod(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @DataProvider(name = "enabledTls") + public Object[][] enabledTls () { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(dataProvider = "enabledTls", timeOut = 240 * 1000) + public void testAutoClusterFailover(boolean enabledTls) throws Exception { + // Start clusters. + setup(); + ServerSocket dummyServer = new ServerSocket(NetworkErrorTestBase.getOneFreePort()); + + // Initialize client. + String urlProxy = enabledTls ? "pulsar+tls://127.0.0.1:" + dummyServer.getLocalPort() + : "pulsar://127.0.0.1:" + dummyServer.getLocalPort(); + String url1 = enabledTls ? pulsar1.getBrokerServiceUrlTls() : pulsar1.getBrokerServiceUrl(); + String url2 = enabledTls ? pulsar2.getBrokerServiceUrlTls() : pulsar2.getBrokerServiceUrl(); + final String[] urlArray = new String[]{url1, urlProxy, url2}; + final SameAuthParamsLookupAutoClusterFailover failover = SameAuthParamsLookupAutoClusterFailover.builder() + .pulsarServiceUrlArray(urlArray) + .failoverThreshold(5) + .recoverThreshold(5) + .checkHealthyIntervalMs(300) + .testTopic("a/b/c") + .markTopicNotFoundAsAvailable(true) + .build(); + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrlProvider(failover); + if (enabledTls) { + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", getTlsFileForClient("admin.cert")); + authParams.put("tlsKeyFile", getTlsFileForClient("admin.key-pk8")); + clientBuilder.authentication(AuthenticationTls.class.getName(), authParams) + .enableTls(true) + .allowTlsInsecureConnection(false) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH); + } + final PulsarClient client = clientBuilder.build(); + failover.initialize(client); + final EventLoopGroup executor = WhiteboxImpl.getInternalState(failover, "executor"); + final PulsarServiceState[] stateArray = + WhiteboxImpl.getInternalState(failover, "pulsarServiceStateArray"); + + // Test all things is fine. + final String tp = BrokerTestUtil.newUniqueName(nonReplicatedNamespace + "/tp"); + final Producer producer = client.newProducer(Schema.STRING).topic(tp).create(); + producer.send("0"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0); + + CompletableFuture checkStatesFuture1 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Healthy; + res = res & stateArray[1] == PulsarServiceState.Healthy; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture1.complete(res); + }); + Assert.assertTrue(checkStatesFuture1.join()); + + // Test failover 0 --> 3. + pulsar1.close(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture checkStatesFuture2 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Failed; + res = res & stateArray[1] == PulsarServiceState.Failed; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture2.complete(res); + }); + Assert.assertTrue(checkStatesFuture2.join()); + producer.send("0->2"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2); + }); + + // Test recover 2 --> 1. + executor.execute(() -> { + urlArray[1] = url2; + }); + Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture checkStatesFuture3 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Failed; + res = res & stateArray[1] == PulsarServiceState.Healthy; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture3.complete(res); + }); + Assert.assertTrue(checkStatesFuture3.join()); + producer.send("2->1"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1); + }); + + // Test recover 1 --> 0. + executor.execute(() -> { + urlArray[0] = url2; + }); + Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture checkStatesFuture4 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Healthy; + res = res & stateArray[1] == PulsarServiceState.Healthy; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture4.complete(res); + }); + Assert.assertTrue(checkStatesFuture4.join()); + producer.send("1->0"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0); + }); + + // cleanup. + producer.close(); + client.close(); + dummyServer.close(); + } + + @Override + protected void cleanupPulsarResources() { + // Nothing to do. + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index eef4469aa95fa..e155e399e2437 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -84,7 +84,7 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { // All certificate-authority files are copied from the tests/certificate-authority directory and all share the same // root CA. - protected static String getTlsFileForClient(String name) { + public static String getTlsFileForClient(String name) { return ResourceUtils.getAbsolutePath(String.format("certificate-authority/client-keys/%s.pem", name)); } public final static String CA_CERT_FILE_PATH = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java index 36f8cb4761248..742194d9b12a1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java @@ -102,7 +102,7 @@ protected void startBrokers() throws Exception { log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(), broker2.getListenPort()); } - protected int getOneFreePort() throws IOException { + public static int getOneFreePort() throws IOException { ServerSocket serverSocket = new ServerSocket(0); int port = serverSocket.getLocalPort(); serverSocket.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index d66e666e3a055..f3076ebdec6c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_CERT_FILE_PATH; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_KEY_FILE_PATH; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -267,10 +270,18 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setLoadBalancerSheddingEnabled(false); config.setForceDeleteNamespaceAllowed(true); + config.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH); + config.setTlsKeyFilePath(BROKER_KEY_FILE_PATH); + config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); + config.setClusterName(clusterName); + config.setTlsRequireTrustedClientCertOnConnect(false); + Set tlsProtocols = Sets.newConcurrentHashSet(); + tlsProtocols.add("TLSv1.3"); + tlsProtocols.add("TLSv1.2"); + config.setTlsProtocols(tlsProtocols); } - @Override - protected void cleanup() throws Exception { + protected void cleanupPulsarResources() throws Exception { // delete namespaces. waitChangeEventsInit(replicatedNamespace); admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1)); @@ -283,6 +294,12 @@ protected void cleanup() throws Exception { admin2.namespaces().deleteNamespace(replicatedNamespace, true); admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true); } + } + + @Override + protected void cleanup() throws Exception { + // cleanup pulsar resources. + cleanupPulsarResources(); // shutdown. markCurrentSetupNumberCleaned(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index bbac688d9224c..80adc79e6fee8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; @@ -46,10 +47,12 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.CommandFlow; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; +import org.testng.AssertJUnit; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -542,18 +545,34 @@ public void testReaderInitAtDeletedPosition() throws Exception { .getStats(topicName, true, true, true).getSubscriptions().get("s1"); log.info("backlog size: {}", subscriptionStats.getMsgBacklog()); assertEquals(subscriptionStats.getMsgBacklog(), 0); - ManagedLedgerInternalStats.CursorStats cursorStats = - admin.topics().getInternalStats(topicName).cursors.get("s1"); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName); + ManagedLedgerInternalStats.CursorStats cursorStats = internalStats.cursors.get("s1"); String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":"); - Position actMarkDeletedPos = - PositionFactory.create(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); - Position expectedMarkDeletedPos = - PositionFactory.create(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); + ImmutablePositionImpl actMarkDeletedPos = + new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); + ImmutablePositionImpl expectedMarkDeletedPos = + new ImmutablePositionImpl(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); + log.info("LAC: {}", internalStats.lastConfirmedEntry); log.info("Expected mark deleted position: {}", expectedMarkDeletedPos); log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition); - assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); + AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); }); + admin.topics().createSubscription(topicName, "s2", MessageId.earliest); + admin.topics().createSubscription(topicName, "s3", MessageId.latest); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName); + ManagedLedgerInternalStats.CursorStats cursorStats2 = internalStats.cursors.get("s2"); + String[] ledgerIdAndEntryId2 = cursorStats2.markDeletePosition.split(":"); + ImmutablePositionImpl actMarkDeletedPos2 = + new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId2[0]), Long.valueOf(ledgerIdAndEntryId2[1])); + ManagedLedgerInternalStats.CursorStats cursorStats3 = internalStats.cursors.get("s3"); + String[] ledgerIdAndEntryId3 = cursorStats3.markDeletePosition.split(":"); + ImmutablePositionImpl actMarkDeletedPos3 = + new ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId3[0]), Long.valueOf(ledgerIdAndEntryId3[1])); + log.info("LAC: {}", internalStats.lastConfirmedEntry); + log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2); + log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3); + pulsar.getBrokerService().getTopic(topicName, false).join().get(); // cleanup. reader.close(); producer.close(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java index 5cb22276553ab..e8b513b103f65 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java @@ -56,7 +56,7 @@ public interface ServiceUrlProvider extends AutoCloseable { * */ @Override - default void close() { + default void close() throws Exception { // do nothing } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java new file mode 100644 index 0000000000000..4beff4719c895 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.ScheduledFuture; +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.netty.EventLoopUtil; + +@Slf4j +@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}) +public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvider { + + private PulsarClientImpl pulsarClient; + private EventLoopGroup executor; + private volatile boolean closed; + private ScheduledFuture scheduledCheckTask; + @Getter + private int failoverThreshold = 5; + @Getter + private int recoverThreshold = 5; + @Getter + private long checkHealthyIntervalMs = 1000; + @Getter + private boolean markTopicNotFoundAsAvailable = true; + @Getter + private String testTopic = "public/default/tp_test"; + + private String[] pulsarServiceUrlArray; + private PulsarServiceState[] pulsarServiceStateArray; + private MutableInt[] checkCounterArray; + @Getter + private volatile int currentPulsarServiceIndex; + + private SameAuthParamsLookupAutoClusterFailover() {} + + @Override + public void initialize(PulsarClient client) { + this.currentPulsarServiceIndex = 0; + this.pulsarClient = (PulsarClientImpl) client; + this.executor = EventLoopUtil.newEventLoopGroup(1, false, + new ExecutorProvider.ExtendedThreadFactory("broker-service-url-check")); + scheduledCheckTask = executor.scheduleAtFixedRate(() -> { + if (closed) { + return; + } + checkPulsarServices(); + int firstHealthyPulsarService = firstHealthyPulsarService(); + if (firstHealthyPulsarService == currentPulsarServiceIndex) { + return; + } + if (firstHealthyPulsarService < 0) { + int failoverTo = findFailoverTo(); + if (failoverTo < 0) { + // No healthy pulsar service to connect. + log.error("Failed to choose a pulsar service to connect, no one pulsar service is healthy. Current" + + " pulsar service: [{}] {}. States: {}, Counters: {}", currentPulsarServiceIndex, + pulsarServiceUrlArray[currentPulsarServiceIndex], Arrays.toString(pulsarServiceStateArray), + Arrays.toString(checkCounterArray)); + } else { + // Failover to low priority pulsar service. + updateServiceUrl(failoverTo); + } + } else { + // Back to high priority pulsar service. + updateServiceUrl(firstHealthyPulsarService); + } + }, checkHealthyIntervalMs, checkHealthyIntervalMs, TimeUnit.MILLISECONDS); + } + + @Override + public String getServiceUrl() { + return pulsarServiceUrlArray[currentPulsarServiceIndex]; + } + + @Override + public void close() throws Exception { + log.info("Closing service url provider. Current pulsar service: [{}] {}", currentPulsarServiceIndex, + pulsarServiceUrlArray[currentPulsarServiceIndex]); + closed = true; + scheduledCheckTask.cancel(false); + executor.shutdownNow(); + } + + private int firstHealthyPulsarService() { + for (int i = 0; i <= currentPulsarServiceIndex; i++) { + if (pulsarServiceStateArray[i] == PulsarServiceState.Healthy + || pulsarServiceStateArray[i] == PulsarServiceState.PreFail) { + return i; + } + } + return -1; + } + + private int findFailoverTo() { + for (int i = currentPulsarServiceIndex + 1; i <= pulsarServiceUrlArray.length; i++) { + if (probeAvailable(i)) { + return i; + } + } + return -1; + } + + private void checkPulsarServices() { + for (int i = 0; i <= currentPulsarServiceIndex; i++) { + if (probeAvailable(i)) { + switch (pulsarServiceStateArray[i]) { + case Healthy: { + break; + } + case PreFail: { + pulsarServiceStateArray[i] = PulsarServiceState.Healthy; + checkCounterArray[i].setValue(0); + break; + } + case Failed: { + pulsarServiceStateArray[i] = PulsarServiceState.PreRecover; + checkCounterArray[i].setValue(1); + break; + } + case PreRecover: { + checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1); + if (checkCounterArray[i].getValue() >= recoverThreshold) { + pulsarServiceStateArray[i] = PulsarServiceState.Healthy; + checkCounterArray[i].setValue(0); + } + break; + } + } + } else { + switch (pulsarServiceStateArray[i]) { + case Healthy: { + pulsarServiceStateArray[i] = PulsarServiceState.PreFail; + checkCounterArray[i].setValue(1); + break; + } + case PreFail: { + checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1); + if (checkCounterArray[i].getValue() >= failoverThreshold) { + pulsarServiceStateArray[i] = PulsarServiceState.Failed; + checkCounterArray[i].setValue(0); + } + break; + } + case Failed: { + break; + } + case PreRecover: { + pulsarServiceStateArray[i] = PulsarServiceState.Failed; + checkCounterArray[i].setValue(0); + break; + } + } + } + } + } + + private boolean probeAvailable(int brokerServiceIndex) { + String url = pulsarServiceUrlArray[brokerServiceIndex]; + try { + LookupTopicResult res = pulsarClient.getLookup(url).getBroker(TopicName.get(testTopic)) + .get(3, TimeUnit.SECONDS); + if (log.isDebugEnabled()) { + log.debug("Success to probe available(lookup res: {}), [{}] {}}. States: {}, Counters: {}", + res.toString(), brokerServiceIndex, url, Arrays.toString(pulsarServiceStateArray), + Arrays.toString(checkCounterArray)); + } + return true; + } catch (Exception e) { + Throwable actEx = FutureUtil.unwrapCompletionException(e); + if (actEx instanceof PulsarAdminException.NotFoundException + || actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarClientException.LookupException) { + if (markTopicNotFoundAsAvailable) { + if (log.isDebugEnabled()) { + log.debug("Success to probe available(case tenant/namespace/topic not found), [{}] {}." + + " States: {}, Counters: {}", brokerServiceIndex, url, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + } + return true; + } else { + log.warn("Failed to probe available(error tenant/namespace/topic not found), [{}] {}. States: {}," + + " Counters: {}", brokerServiceIndex, url, Arrays.toString(pulsarServiceStateArray), + Arrays.toString(checkCounterArray)); + return false; + } + } + log.warn("Failed to probe available, [{}] {}. States: {}, Counters: {}", brokerServiceIndex, url, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + return false; + } + } + + private void updateServiceUrl(int targetIndex) { + String currentUrl = pulsarServiceUrlArray[currentPulsarServiceIndex]; + String targetUrl = pulsarServiceUrlArray[targetIndex]; + String logMsg; + if (targetIndex < currentPulsarServiceIndex) { + logMsg = String.format("Recover to high priority pulsar service [%s] %s --> [%s] %s. States: %s," + + " Counters: %s", currentPulsarServiceIndex, currentUrl, targetIndex, targetUrl, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + } else { + logMsg = String.format("Failover to low priority pulsar service [%s] %s --> [%s] %s. States: %s," + + " Counters: %s", currentPulsarServiceIndex, currentUrl, targetIndex, targetUrl, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + } + log.info(logMsg); + try { + pulsarClient.updateServiceUrl(targetUrl); + pulsarClient.reloadLookUp(); + currentPulsarServiceIndex = targetIndex; + } catch (Exception e) { + log.error("Failed to {}", logMsg, e); + } + } + + public enum PulsarServiceState { + Healthy, + PreFail, + Failed, + PreRecover; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private SameAuthParamsLookupAutoClusterFailover + sameAuthParamsLookupAutoClusterFailover = new SameAuthParamsLookupAutoClusterFailover(); + + public Builder failoverThreshold(int failoverThreshold) { + if (failoverThreshold < 1) { + throw new IllegalArgumentException("failoverThreshold must be larger than 0"); + } + sameAuthParamsLookupAutoClusterFailover.failoverThreshold = failoverThreshold; + return this; + } + + public Builder recoverThreshold(int recoverThreshold) { + if (recoverThreshold < 1) { + throw new IllegalArgumentException("recoverThreshold must be larger than 0"); + } + sameAuthParamsLookupAutoClusterFailover.recoverThreshold = recoverThreshold; + return this; + } + + public Builder checkHealthyIntervalMs(int checkHealthyIntervalMs) { + if (checkHealthyIntervalMs < 1) { + throw new IllegalArgumentException("checkHealthyIntervalMs must be larger than 0"); + } + sameAuthParamsLookupAutoClusterFailover.checkHealthyIntervalMs = checkHealthyIntervalMs; + return this; + } + + public Builder testTopic(String testTopic) { + if (StringUtils.isBlank(testTopic) && TopicName.get(testTopic) != null) { + throw new IllegalArgumentException("testTopic can not be blank"); + } + sameAuthParamsLookupAutoClusterFailover.testTopic = testTopic; + return this; + } + + public Builder markTopicNotFoundAsAvailable(boolean markTopicNotFoundAsAvailable) { + sameAuthParamsLookupAutoClusterFailover.markTopicNotFoundAsAvailable = markTopicNotFoundAsAvailable; + return this; + } + + public Builder pulsarServiceUrlArray(String[] pulsarServiceUrlArray) { + if (pulsarServiceUrlArray == null || pulsarServiceUrlArray.length == 0) { + throw new IllegalArgumentException("pulsarServiceUrlArray can not be empty"); + } + sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray = pulsarServiceUrlArray; + int pulsarServiceLen = pulsarServiceUrlArray.length; + HashSet uniqueChecker = new HashSet<>(); + for (int i = 0; i < pulsarServiceLen; i++) { + String pulsarService = pulsarServiceUrlArray[i]; + if (StringUtils.isBlank(pulsarService)) { + throw new IllegalArgumentException("pulsarServiceUrlArray contains a blank value at index " + i); + } + if (pulsarService.startsWith("http") || pulsarService.startsWith("HTTP")) { + throw new IllegalArgumentException("SameAuthParamsLookupAutoClusterFailover does not support HTTP" + + " protocol pulsar service url so far."); + } + if (!uniqueChecker.add(pulsarService)) { + throw new IllegalArgumentException("pulsarServiceUrlArray contains duplicated value " + + pulsarServiceUrlArray[i]); + } + } + return this; + } + + public SameAuthParamsLookupAutoClusterFailover build() { + String[] pulsarServiceUrlArray = sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray; + if (pulsarServiceUrlArray == null) { + throw new IllegalArgumentException("pulsarServiceUrlArray can not be empty"); + } + int pulsarServiceLen = pulsarServiceUrlArray.length; + sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray = new PulsarServiceState[pulsarServiceLen]; + sameAuthParamsLookupAutoClusterFailover.checkCounterArray = new MutableInt[pulsarServiceLen]; + for (int i = 0; i < pulsarServiceLen; i++) { + sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray[i] = PulsarServiceState.Healthy; + sameAuthParamsLookupAutoClusterFailover.checkCounterArray[i] = new MutableInt(0); + } + return sameAuthParamsLookupAutoClusterFailover; + } + } +} + diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java index 545cf7483e4e3..b275ffb6012ca 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,7 +31,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.awaitility.Awaitility; @@ -43,7 +41,7 @@ @Slf4j public class AutoClusterFailoverTest { @Test - public void testBuildAutoClusterFailoverInstance() throws PulsarClientException { + public void testBuildAutoClusterFailoverInstance() throws Exception { String primary = "pulsar://localhost:6650"; String secondary = "pulsar://localhost:6651"; long failoverDelay = 30; @@ -106,7 +104,7 @@ public void testBuildAutoClusterFailoverInstance() throws PulsarClientException } @Test - public void testInitialize() { + public void testInitialize() throws Exception { String primary = "pulsar://localhost:6650"; String secondary = "pulsar://localhost:6651"; long failoverDelay = 10; @@ -151,7 +149,7 @@ public void testInitialize() { } @Test - public void testAutoClusterFailoverSwitchWithoutAuthentication() { + public void testAutoClusterFailoverSwitchWithoutAuthentication() throws Exception { String primary = "pulsar://localhost:6650"; String secondary = "pulsar://localhost:6651"; long failoverDelay = 1; @@ -187,7 +185,7 @@ public void testAutoClusterFailoverSwitchWithoutAuthentication() { } @Test - public void testAutoClusterFailoverSwitchWithAuthentication() throws IOException { + public void testAutoClusterFailoverSwitchWithAuthentication() throws Exception { String primary = "pulsar+ssl://localhost:6651"; String secondary = "pulsar+ssl://localhost:6661"; long failoverDelay = 1; @@ -251,7 +249,7 @@ public void testAutoClusterFailoverSwitchWithAuthentication() throws IOException } @Test - public void testAutoClusterFailoverSwitchTlsTrustStore() throws IOException { + public void testAutoClusterFailoverSwitchTlsTrustStore() throws Exception { String primary = "pulsar+ssl://localhost:6651"; String secondary = "pulsar+ssl://localhost:6661"; long failoverDelay = 1; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java index 36160d40d540a..fa7145794e1e2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -37,7 +36,7 @@ @Test(groups = "broker-impl") public class ControlledClusterFailoverTest { @Test - public void testBuildControlledClusterFailoverInstance() throws IOException { + public void testBuildControlledClusterFailoverInstance() throws Exception { String defaultServiceUrl = "pulsar://localhost:6650"; String urlProvider = "http://localhost:8080/test"; String keyA = "key-a"; @@ -67,7 +66,7 @@ public void testBuildControlledClusterFailoverInstance() throws IOException { } @Test - public void testControlledClusterFailoverSwitch() throws IOException { + public void testControlledClusterFailoverSwitch() throws Exception { String defaultServiceUrl = "pulsar+ssl://localhost:6651"; String backupServiceUrl = "pulsar+ssl://localhost:6661"; String urlProvider = "http://localhost:8080";