producer;
- private final ProducerStats stats;
- private long cnt = 0;
- String topic;
- long numberOfRecords;
- int recordSize;
- long throughput;
-}
diff --git a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ProducerStats.java b/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ProducerStats.java
deleted file mode 100644
index c5f5263e7c23..000000000000
--- a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ProducerStats.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package io.vectorized.kafka;
-
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.RecordMetadata;
-
-public class ProducerStats {
- private long start;
- private long windowStart;
- private int[] latencies;
- private int sampling;
- private int iteration;
- private int index;
- private long count;
- private long bytes;
- private int maxLatency;
- private long totalLatency;
- private long windowCount;
- private int windowMaxLatency;
- private long windowTotalLatency;
- private long windowBytes;
- private long reportingInterval;
- private Expectations ex;
-
- public ProducerStats(
- long numRecords, int reportingInterval, Expectations ex) {
- this.start = System.currentTimeMillis();
- this.windowStart = System.currentTimeMillis();
- this.iteration = 0;
- this.sampling = (int)(numRecords / Math.min(numRecords, 500000));
- this.latencies = new int[(int)(numRecords / this.sampling) + 1];
- this.index = 0;
- this.maxLatency = 0;
- this.totalLatency = 0;
- this.windowCount = 0;
- this.windowMaxLatency = 0;
- this.windowTotalLatency = 0;
- this.windowBytes = 0;
- this.totalLatency = 0;
- this.reportingInterval = reportingInterval;
- this.ex = ex;
- }
-
- public void record(
- int iter, int latency, int bytes, long time, RecordMetadata md,
- long recordTs, long recordCnt) {
- this.count++;
- this.bytes += bytes;
- this.totalLatency += latency;
- this.maxLatency = Math.max(this.maxLatency, latency);
- this.windowCount++;
- this.windowBytes += bytes;
- this.windowTotalLatency += latency;
- this.windowMaxLatency = Math.max(windowMaxLatency, latency);
- if (iter % this.sampling == 0) {
- this.latencies[index] = latency;
- this.index++;
- }
- /* maybe report the recent perf */
- if (time - windowStart >= reportingInterval) {
- printWindow();
- newWindow();
- }
- ex.put(md, recordTs, recordCnt);
- }
-
- public Callback nextCompletion(
- long start, int bytes, ProducerStats stats, long record_ts,
- long record_cnt) {
- Callback cb = new VerifierCallback(
- this.iteration, start, bytes, stats, record_ts, record_cnt);
- this.iteration++;
- return cb;
- }
-
- public void printWindow() {
- long ellapsed = System.currentTimeMillis() - windowStart;
- double recsPerSec = 1000.0 * windowCount / (double)ellapsed;
- double mbPerSec
- = 1000.0 * this.windowBytes / (double)ellapsed / (1024.0 * 1024.0);
- System.out.printf(
- "PROD - %d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f ms max latency.%n",
- windowCount, recsPerSec, mbPerSec,
- windowTotalLatency / (double)windowCount, (double)windowMaxLatency);
- ex.printQueues();
- }
-
- public void newWindow() {
- this.windowStart = System.currentTimeMillis();
- this.windowCount = 0;
- this.windowMaxLatency = 0;
- this.windowTotalLatency = 0;
- this.windowBytes = 0;
- }
-
- public void printTotal() {
- long elapsed = System.currentTimeMillis() - start;
- double recsPerSec = 1000.0 * count / (double)elapsed;
- double mbPerSec = 1000.0 * this.bytes / (double)elapsed / (1024.0 * 1024.0);
- int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
- System.out.printf(
- "PROD - %d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
- count, recsPerSec, mbPerSec, totalLatency / (double)count,
- (double)maxLatency, percs[0], percs[1], percs[2], percs[3]);
- }
-
- private static int[] percentiles(
- int[] latencies, int count, double... percentiles) {
- int size = Math.min(count, latencies.length);
- Arrays.sort(latencies, 0, size);
- int[] values = new int[percentiles.length];
- for (int i = 0; i < percentiles.length; i++) {
- int index = (int)(percentiles[i] * size);
- values[i] = latencies[index];
- }
- return values;
- }
-}
diff --git a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ThroughputThrottler.java b/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ThroughputThrottler.java
deleted file mode 100644
index 6cce294176e2..000000000000
--- a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ThroughputThrottler.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package io.vectorized.kafka;
-
-/**
- * This class helps producers throttle throughput.
- *
- * If targetThroughput >= 0, the resulting average throughput will be
- * approximately min(targetThroughput, maximumPossibleThroughput). If
- * targetThroughput < 0, no throttling will occur.
- *
- * To use, do this between successive send attempts:
- *
- * {@code
- * if (throttler.shouldThrottle(...)) {
- * throttler.throttle();
- * }
- * }
- *
- *
- * Note that this can be used to throttle message throughput or data throughput.
- */
-public class ThroughputThrottler {
-
- private static final long NS_PER_MS = 1000000L;
- private static final long NS_PER_SEC = 1000 * NS_PER_MS;
- private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
- private final long startMs;
- private final long sleepTimeNs;
- private final long targetThroughput;
-
- private long sleepDeficitNs = 0;
- private boolean wakeup = false;
-
- /**
- * @param targetThroughput Can be messages/sec or bytes/sec
- * @param startMs When the very first message is sent
- */
- public ThroughputThrottler(long targetThroughput, long startMs) {
- this.startMs = startMs;
- this.targetThroughput = targetThroughput;
- this.sleepTimeNs
- = targetThroughput > 0 ? NS_PER_SEC / targetThroughput : Long.MAX_VALUE;
- }
-
- /**
- * @param amountSoFar bytes produced so far if you want to throttle data
- * throughput, or
- * messages produced so far if you want to throttle message
- * throughput.
- * @param sendStartMs timestamp of the most recently sent message
- * @return
- */
- public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
- if (this.targetThroughput < 0) {
- // No throttling in this case
- return false;
- }
-
- float elapsedSec = (sendStartMs - startMs) / 1000.f;
- return elapsedSec > 0 && (amountSoFar / elapsedSec) > this.targetThroughput;
- }
-
- /**
- * Occasionally blocks for small amounts of time to achieve targetThroughput.
- *
- * Note that if targetThroughput is 0, this will block extremely aggressively.
- */
- public void throttle() {
- if (targetThroughput == 0) {
- try {
- synchronized (this) {
- while (!wakeup) {
- this.wait();
- }
- }
- } catch (InterruptedException e) {
- // do nothing
- }
- return;
- }
-
- // throttle throughput by sleeping, on average,
- // (1 / this.throughput) seconds between "things sent"
- sleepDeficitNs += sleepTimeNs;
-
- // If enough sleep deficit has accumulated, sleep a little
- if (sleepDeficitNs >= MIN_SLEEP_NS) {
- long sleepStartNs = System.nanoTime();
- try {
- synchronized (this) {
- long remaining = sleepDeficitNs;
- while (!wakeup && remaining > 0) {
- long sleepMs = remaining / 1000000;
- long sleepNs = remaining - sleepMs * 1000000;
- this.wait(sleepMs, (int)sleepNs);
- long elapsed = System.nanoTime() - sleepStartNs;
- remaining = sleepDeficitNs - elapsed;
- }
- wakeup = false;
- }
- sleepDeficitNs = 0;
- } catch (InterruptedException e) {
- // If sleep is cut short, reduce deficit by the amount of
- // time we actually spent sleeping
- long sleepElapsedNs = System.nanoTime() - sleepStartNs;
- if (sleepElapsedNs <= sleepDeficitNs) {
- sleepDeficitNs -= sleepElapsedNs;
- }
- }
- }
- }
-
- /**
- * Wakeup the throttler if its sleeping.
- */
- public void wakeup() {
- synchronized (this) {
- wakeup = true;
- this.notifyAll();
- }
- }
-}
diff --git a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ToolsUtils.java b/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ToolsUtils.java
deleted file mode 100644
index 9560ac3f6595..000000000000
--- a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/ToolsUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package io.vectorized.kafka;
-
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-
-public class ToolsUtils {
- public static void printMetrics(Map metrics) {
- if (metrics != null && !metrics.isEmpty()) {
- int maxLengthOfDisplayName = 0;
- TreeMap sortedMetrics = new TreeMap<>();
- for (Metric metric : metrics.values()) {
- MetricName mName = metric.metricName();
- String mergedName
- = mName.group() + ":" + mName.name() + ":" + mName.tags();
- maxLengthOfDisplayName = maxLengthOfDisplayName < mergedName.length()
- ? mergedName.length()
- : maxLengthOfDisplayName;
- sortedMetrics.put(mergedName, metric.metricValue());
- }
- String doubleOutputFormat = "%-" + maxLengthOfDisplayName + "s : %.3f";
- String defaultOutputFormat = "%-" + maxLengthOfDisplayName + "s : %s";
- System.out.println(String.format(
- "\n%-" + maxLengthOfDisplayName + "s %s", "Metric Name", "Value"));
-
- for (Map.Entry entry : sortedMetrics.entrySet()) {
- String outputFormat;
- if (entry.getValue() instanceof Double)
- outputFormat = doubleOutputFormat;
- else
- outputFormat = defaultOutputFormat;
- System.out.println(
- String.format(outputFormat, entry.getKey(), entry.getValue()));
- }
- }
- }
-}
diff --git a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/Verifier.java b/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/Verifier.java
deleted file mode 100644
index c361d05008d4..000000000000
--- a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/Verifier.java
+++ /dev/null
@@ -1,257 +0,0 @@
-package io.vectorized.kafka;
-
-import static net.sourceforge.argparse4j.impl.Arguments.store;
-import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
-import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.ListTopicsOptions;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class Verifier {
- private static final Logger logger = LoggerFactory.getLogger(Verifier.class);
-
- /** Get the command-line argument parser. */
- private static ArgumentParser argParser() {
- final ArgumentParser parser
- = ArgumentParsers.newFor("kafka-verifier").addHelp(true).build();
-
- final MutuallyExclusiveGroup payloadOptions
- = parser.addMutuallyExclusiveGroup().required(true).description(
- "either --record-size or --payload-file must be specified"
- + " but not both.");
-
- parser.addArgument("--broker")
- .action(store())
- .required(true)
- .type(String.class)
- .metavar("BROKER")
- .help("broker address");
- parser.addArgument("--topic")
- .action(store())
- .required(true)
- .type(String.class)
- .metavar("TOPIC")
- .help("produce messages to this topic");
- parser.addArgument("--replication-factor")
- .action(store())
- .required(true)
- .type(Short.class)
- .metavar("R-G")
- .dest("replicationFactor")
- .help("topic replication factor");
- parser.addArgument("--partitions")
- .action(store())
- .required(true)
- .type(Integer.class)
- .metavar("PARTITIONS")
- .dest("partitions")
- .help("topic partition count");
- parser.addArgument("--num-records")
- .action(store())
- .required(true)
- .type(Long.class)
- .metavar("NUM-RECORDS")
- .dest("numRecords")
- .help("number of messages to produce");
-
- payloadOptions.addArgument("--record-size")
- .action(store())
- .required(false)
- .type(Integer.class)
- .metavar("RECORD-SIZE")
- .dest("recordSize")
- .help(
- "message size in bytes. Note that you must provide exactly one of"
- + " --record-size or --payload-file.");
-
- parser.addArgument("--throughput")
- .action(store())
- .required(true)
- .type(Integer.class)
- .metavar("THROUGHPUT")
- .help(
- "throttle maximum message throughput to *approximately* THROUGHPUT"
- + " messages/sec. Set this to -1 to disable throttling.");
-
- parser.addArgument("--producer-props")
- .nargs("+")
- .required(false)
- .metavar("PROP-NAME=PROP-VALUE")
- .type(String.class)
- .dest("producerConfig")
- .help(
- "kafka producer related configuration properties like "
- + "bootstrap.servers,client.id etc. "
- + "These configs take precedence over those passed via"
- + " --producer.config.");
-
- parser.addArgument("--producer.config")
- .action(store())
- .required(false)
- .type(String.class)
- .metavar("CONFIG-FILE")
- .dest("producerConfigFile")
- .help("producer config properties file.");
-
- parser.addArgument("--consumer-props")
- .nargs("+")
- .required(false)
- .metavar("PROP-NAME=PROP-VALUE")
- .type(String.class)
- .dest("consumerConfig")
- .help(
- "kafka consumer related configuration properties like "
- + "bootstrap.servers,client.id etc. "
- + "These configs take precedence over those passed via "
- + "--consumer.config.");
-
- parser.addArgument("--consumer.config")
- .action(store())
- .required(false)
- .type(String.class)
- .metavar("CONFIG-FILE")
- .dest("consumerConfigFile")
- .help("consumer config properties file.");
-
- parser.addArgument("--random-consumer-group")
- .action(store())
- .required(false)
- .type(Boolean.class)
- .metavar("RANDOM-CONSUMER-GROUP")
- .dest("randomConsGroup")
- .help("randomize consumer group name");
- return parser;
- }
-
- private static Properties createProperties(
- final String producerConfig, final List producerProps)
- throws IOException {
-
- final Properties props = new Properties();
- if (producerConfig != null) {
- props.putAll(Utils.loadProps(producerConfig));
- }
- if (producerProps != null) {
- for (final String prop : producerProps) {
- final String[] pieces = prop.split("=");
- if (pieces.length != 2)
- throw new IllegalArgumentException("Invalid property: " + prop);
- props.put(pieces[0], pieces[1]);
- }
- }
- return props;
- }
-
- public static void main(final String[] args) throws Exception {
- final ArgumentParser parser = argParser();
-
- try {
- final Namespace res = parser.parseArgs(args);
-
- /* parse args */
- final String broker = res.getString("broker");
- final String topicName = res.getString("topic");
- final short replicationFactor = res.getShort("replicationFactor");
- final int partitions = res.getInt("partitions");
- final long numRecords = res.getLong("numRecords");
- final Integer recordSize = res.getInt("recordSize");
- final int throughput = res.getInt("throughput");
- final List producerProps = res.getList("producerConfig");
- final List consumerProps = res.getList("consumerConfig");
- final String producerConfig = res.getString("producerConfigFile");
- final String consumerConfig = res.getString("consumerConfigFile");
- final Boolean randomGroup = res.getBoolean("randomConsGroup");
-
- if (recordSize < 16) {
- throw new ArgumentParserException(
- "Smaller record size supported by verifier is 16 bytes", parser);
- }
-
- if (producerProps == null && producerConfig == null) {
- throw new ArgumentParserException(
- "Either --producer-props or --producer.config must be specified.",
- parser);
- }
-
- final Properties pProps = createProperties(producerConfig, producerProps);
- final Properties cProps = createProperties(consumerConfig, consumerProps);
- pProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
- cProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
- if (randomGroup) {
- String group
- = "verifier-group-" + RandomStringUtils.randomAlphabetic(8);
- cProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group);
- logger.info("Using consumer group - {}", group);
- }
- final Properties adminProperties = new Properties();
- adminProperties.setProperty(
- AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
-
- try (AdminClient admin = AdminClient.create(adminProperties)) {
- admin.listTopics()
- .names()
- .thenApply((topics) -> {
- if (topics.contains(topicName)) {
- logger.info("Topic '{}' already exists", topicName);
- return KafkaFuture.completedFuture(null);
- }
- NewTopic nTopic
- = new NewTopic(topicName, partitions, replicationFactor);
- logger.info(
- "Creating topic {} with RF: {} and {} partitions", topicName,
- replicationFactor, partitions);
- return admin.createTopics(Collections.singletonList(nTopic));
- })
- .get();
-
- Set topics = admin.listTopics().names().get();
- while (!topics.contains(topicName)) {
- topics = admin.listTopics().names().get();
- }
- }
-
- final Expectations ex = new Expectations();
- final ProducerStats pStats = new ProducerStats(numRecords, 2000, ex);
- final Producer producer = new Producer(
- topicName, numRecords, recordSize, throughput, pProps, pStats);
- final Consumer consumer = new Consumer(topicName, numRecords, cProps, ex);
- final CompletableFuture cf = consumer.startConsumer();
- final CompletableFuture pf = producer.startProducer();
-
- CompletableFuture.allOf(pf, cf)
- .thenRun(() -> { producer.finish(); })
- .join();
- ex.finish();
-
- } catch (final ArgumentParserException e) {
- if (args.length == 0) {
- parser.printHelp();
- Exit.exit(0);
- } else {
- parser.handleError(e);
- Exit.exit(1);
- }
- }
- }
-}
diff --git a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/VerifierCallback.java b/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/VerifierCallback.java
deleted file mode 100644
index a961d194b0f4..000000000000
--- a/tests/java/kafka-verifier/src/main/java/io/vectorized/kafka/VerifierCallback.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package io.vectorized.kafka;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.RecordMetadata;
-
-public final class VerifierCallback implements Callback {
- private final long start;
- private final int iteration;
- private final int bytes;
- private final ProducerStats stats;
- private final long recordTs;
- private final long recordCnt;
-
- public VerifierCallback(
- int iter, long start, int bytes, ProducerStats stats, long recordTs,
- long recordCnt) {
- this.start = start;
- this.stats = stats;
- this.iteration = iter;
- this.bytes = bytes;
- this.recordTs = recordTs;
- this.recordCnt = recordCnt;
- }
-
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- long now = System.currentTimeMillis();
- int latency = (int)(now - start);
- this.stats.record(
- iteration, latency, bytes, now, metadata, recordTs, recordCnt);
-
- if (exception != null) {
- exception.printStackTrace();
- }
- }
-}
diff --git a/tests/java/kafka-verifier/src/main/resources/log4j.properties b/tests/java/kafka-verifier/src/main/resources/log4j.properties
deleted file mode 100644
index d20efbb3b643..000000000000
--- a/tests/java/kafka-verifier/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-# Root logger option
-log4j.rootLogger=INFO, stdout
-
-log4j.logger.org.apache.kafka=OFF
-log4j.logger.io.vectorized=INFO
-
-
-# Direct log messages to stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
\ No newline at end of file
diff --git a/tests/java/tx-verifier/pom.xml b/tests/java/verifiers/pom.xml
similarity index 75%
rename from tests/java/tx-verifier/pom.xml
rename to tests/java/verifiers/pom.xml
index e183c83e904d..223b48a17167 100644
--- a/tests/java/tx-verifier/pom.xml
+++ b/tests/java/verifiers/pom.xml
@@ -4,10 +4,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
io.vectorized
- tx-verifier
+ verifiers
jar
1.0-SNAPSHOT
- tx-verifier
+ verifiers
UTF-8
11
@@ -15,6 +15,16 @@
${project.basedir}/target
+
+ com.sparkjava
+ spark-core
+ 2.9.3
+
+
+ com.google.code.gson
+ gson
+ 2.8.8
+
org.apache.kafka
kafka-clients
@@ -36,14 +46,7 @@
${binDir}
- tx-verifier
-
-
-
- io.vectorized.kafka.Verifier
-
-
-
+ verifiers
false
diff --git a/tests/java/verifiers/src/main/java/io/vectorized/reads_writes/App.java b/tests/java/verifiers/src/main/java/io/vectorized/reads_writes/App.java
new file mode 100644
index 000000000000..24fb45f797b6
--- /dev/null
+++ b/tests/java/verifiers/src/main/java/io/vectorized/reads_writes/App.java
@@ -0,0 +1,108 @@
+package io.vectorized.reads_writes;
+import static spark.Spark.*;
+
+import com.google.gson.Gson;
+import java.util.HashMap;
+import spark.*;
+
+// java -cp $(pwd)/target/uber-1.0-SNAPSHOT.jar:$(pwd)/target/dependency/*
+// io.vectorized.reads_writes.App
+public class App {
+ public static class InitBody {
+ public String name;
+ public String brokers;
+ public String topic;
+ public int partitions;
+ public int read_write_loop_slack = 1000;
+ }
+
+ public static class Metrics {
+ public long total_writes = 0;
+ public long total_reads = 0;
+ public long min_writes = 0;
+ public long min_reads = 0;
+ }
+
+ public class JsonTransformer implements ResponseTransformer {
+ private Gson gson = new Gson();
+
+ @Override
+ public String render(Object model) {
+ return gson.toJson(model);
+ }
+ }
+
+ HashMap workloads = new HashMap<>();
+
+ void run() throws Exception {
+ port(8080);
+
+ get("/ping", (req, res) -> {
+ res.status(200);
+ return "";
+ });
+
+ get("/info", "application/json", (req, res) -> {
+ try {
+ if (!workloads.containsKey(req.queryParams("name"))) {
+ throw new Exception("can't find: " + req.queryParams("name"));
+ }
+ return workloads.get(req.queryParams("name")).getMetrics();
+ } catch (Exception e) {
+ System.out.println(e);
+ e.printStackTrace();
+ throw e;
+ }
+ }, new JsonTransformer());
+
+ post("/start", (req, res) -> {
+ try {
+ Gson gson = new Gson();
+ InitBody params = gson.fromJson(req.body(), InitBody.class);
+
+ workloads.put(params.name, new Workload(params));
+ workloads.get(params.name).start();
+
+ res.status(200);
+ return "";
+ } catch (Exception e) {
+ System.out.println(e);
+ e.printStackTrace();
+ throw e;
+ }
+ });
+
+ post("/stop", (req, res) -> {
+ try {
+ if (!workloads.containsKey(req.queryParams("name"))) {
+ throw new Exception("can't find: " + req.queryParams("name"));
+ }
+ workloads.get(req.queryParams("name")).stop();
+ } catch (Exception e) {
+ System.out.println(e);
+ e.printStackTrace();
+ throw e;
+ }
+ res.status(200);
+ return "";
+ });
+
+ get("/wait", (req, res) -> {
+ try {
+ if (!workloads.containsKey(req.queryParams("name"))) {
+ throw new Exception("can't find: " + req.queryParams("name"));
+ }
+ workloads.get(req.queryParams("name")).waitStopped();
+ workloads.remove(req.queryParams("name"));
+ } catch (Exception e) {
+ System.out.println(e);
+ e.printStackTrace();
+ throw e;
+ }
+ res.status(200);
+ return "";
+ });
+ }
+
+ public static void main(String[] args) throws Exception { new App().run(); }
+}
diff --git a/tests/java/verifiers/src/main/java/io/vectorized/reads_writes/Workload.java b/tests/java/verifiers/src/main/java/io/vectorized/reads_writes/Workload.java
new file mode 100644
index 000000000000..1a8ac2b6d8fe
--- /dev/null
+++ b/tests/java/verifiers/src/main/java/io/vectorized/reads_writes/Workload.java
@@ -0,0 +1,437 @@
+package io.vectorized.reads_writes;
+import java.lang.Thread;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.Semaphore;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+public class Workload {
+ static enum OpStatus { WRITING, UNKNOWN, WRITTEN, SEEN, SKIPPED }
+
+ static class OpInfo {
+ public long id;
+ public long started_us;
+ public OpStatus status;
+ public int partition;
+ public long offset;
+ }
+
+ static class WriterStat {
+ public long countWritten = 0;
+ public long countRead = 0;
+ public long lastOffset = -1;
+ public int inflights = 0;
+ public boolean draining = false;
+ public Semaphore drained = new Semaphore(0);
+ }
+
+ static volatile long started_s = -1;
+ static int workload_id_gen = 0;
+ static synchronized int get_workload_id() { return workload_id_gen++; }
+
+ private final int workload_id;
+ public volatile boolean is_active = false;
+ public volatile boolean are_writers_stopped = false;
+ public volatile boolean are_readers_stopped = false;
+
+ private volatile App.InitBody args;
+ private volatile ArrayList write_threads;
+ private volatile ArrayList read_threads;
+ private long last_oid = 0;
+ private long count = 0;
+
+ HashMap> oids;
+ HashMap ops;
+ HashMap rsems;
+ HashMap wstat;
+
+ public Workload(App.InitBody args) {
+ this.args = args;
+ this.workload_id = get_workload_id();
+ }
+
+ public App.Metrics getMetrics() {
+ var metrics = new App.Metrics();
+ metrics.min_reads = Long.MAX_VALUE;
+ metrics.total_reads = 0;
+ metrics.min_writes = Long.MAX_VALUE;
+ metrics.total_writes = 0;
+ synchronized (this) {
+ for (int partition = 0; partition < args.partitions; partition++) {
+ var stat = wstat.get(partition);
+ metrics.total_reads += stat.countRead;
+ metrics.min_reads = Math.min(metrics.min_reads, stat.countRead);
+ metrics.total_writes += stat.countWritten;
+ metrics.min_writes = Math.min(metrics.min_writes, stat.countWritten);
+ log(partition, "stat\t" + stat.countWritten + "\t" + stat.countRead);
+ }
+ }
+ return metrics;
+ }
+
+ public void start() throws Exception {
+ is_active = true;
+
+ if (started_s == -1) {
+ started_s = System.currentTimeMillis() / 1000;
+ System.out.println(
+ "#workload\tlogical_time\tseconds_since_start\tpartition\tlog_type\tdetails");
+ }
+ log(-1, "start\t" + args.name);
+
+ write_threads = new ArrayList<>();
+ read_threads = new ArrayList<>();
+
+ oids = new HashMap<>();
+ ops = new HashMap<>();
+ rsems = new HashMap<>();
+ wstat = new HashMap<>();
+
+ for (int partition = 0; partition < args.partitions; partition++) {
+ oids.put(partition, new LinkedList<>());
+ rsems.put(partition, new Semaphore(0));
+ wstat.put(partition, new WriterStat());
+
+ final int p = partition;
+ write_threads.add(new Thread(() -> {
+ try {
+ writeProcess(p);
+ } catch (Exception e) {
+ synchronized (this) {
+ System.out.println("=== write process error");
+ System.out.println(e);
+ e.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+ }));
+
+ read_threads.add(new Thread(() -> {
+ try {
+ readProcess(p);
+ } catch (Exception e) {
+ synchronized (this) {
+ System.out.println("=== read process error");
+ System.out.println(e);
+ e.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+ }));
+ }
+
+ for (var th : write_threads) {
+ th.start();
+ }
+
+ for (var th : read_threads) {
+ th.start();
+ }
+ }
+
+ public void stop() throws Exception {
+ is_active = false;
+
+ log(-1, "stop requested");
+
+ synchronized (this) {
+ for (int partition = 0; partition < args.partitions; partition++) {
+ rsems.get(partition).release();
+ }
+ }
+ }
+
+ public void waitStopped() throws Exception {
+ if (is_active) {
+ throw new Exception("wasn't asked to stop");
+ }
+ if (are_readers_stopped) {
+ return;
+ }
+
+ for (var th : write_threads) {
+ th.join();
+ }
+ log(-1, "writers stopped");
+ are_writers_stopped = true;
+ for (var th : read_threads) {
+ th.join();
+ }
+ log(-1, "readers stopped");
+ are_readers_stopped = true;
+ }
+
+ private void writeProcess(int partition) throws Exception {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, args.brokers);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
+ props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+
+ Producer producer = new KafkaProducer<>(props);
+
+ Semaphore rsem;
+ WriterStat stat = null;
+ synchronized (this) {
+ rsem = rsems.get(partition);
+ stat = wstat.get(partition);
+ }
+
+ boolean is_feedback_started = false;
+
+ while (is_active) {
+ if (stat.countWritten >= args.read_write_loop_slack) {
+ if (!is_feedback_started) {
+ is_feedback_started = true;
+ log(partition, "feedback");
+ }
+ rsem.acquire();
+ }
+
+ OpInfo op = new OpInfo();
+ op.id = get_oid();
+ op.started_us = System.nanoTime() / 1000;
+ op.status = OpStatus.WRITING;
+ op.partition = partition;
+ op.offset = -1;
+ synchronized (this) {
+ stat.inflights++;
+ ops.put(op.id, op);
+ oids.get(partition).add(op.id);
+ log(partition, "w\t" + op.id);
+ }
+
+ var callback = new SendCallback(op, stat);
+ ProducerRecord record
+ = new ProducerRecord<>(args.topic, partition, "" + op.id, "" + op.id);
+ try {
+ producer.send(record, callback);
+ } catch (Exception e) {
+ callback.onCompletion(null, e);
+ }
+ }
+
+ synchronized (this) {
+ stat.draining = true;
+ if (stat.inflights == 0) {
+ return;
+ }
+ }
+
+ stat.drained.acquire();
+ }
+
+ private void readProcess(int partition) throws Exception {
+ var tp = new TopicPartition(args.topic, partition);
+ var tps = Collections.singletonList(tp);
+
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args.brokers);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ props.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
+ Queue oids;
+ Semaphore rsem;
+ WriterStat stat;
+ synchronized (this) {
+ oids = this.oids.get(partition);
+ rsem = rsems.get(partition);
+ stat = wstat.get(partition);
+ }
+
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.assign(tps);
+
+ long offset = -1;
+
+ while (true) {
+ if (are_writers_stopped) {
+ synchronized (this) {
+ if (stat.lastOffset <= offset) {
+ break;
+ }
+ }
+ }
+
+ log(partition, "r");
+ ConsumerRecords records
+ = consumer.poll(Duration.ofMillis(10000));
+ var it = records.iterator();
+ while (it.hasNext()) {
+ var record = it.next();
+
+ offset = record.offset();
+ long oid = Long.parseLong(record.value());
+
+ synchronized (this) {
+ log(partition, "o\t" + oid + "\t" + offset);
+ if (!ops.containsKey(oid)) {
+ violation(partition, "can't read what isn't written id:" + oid);
+ }
+ var op = ops.get(oid);
+ switch (op.status) {
+ case SEEN:
+ case SKIPPED:
+ violation(
+ partition, "unexpected status: " + op.status.name() + " " + oid
+ + "@" + offset);
+ break;
+ case UNKNOWN:
+ case WRITING:
+ op.status = OpStatus.SEEN;
+ op.offset = offset;
+ break;
+ case WRITTEN:
+ if (op.offset != offset) {
+ violation(
+ partition, "read " + oid + "@" + offset + " while expecting @"
+ + op.offset);
+ }
+ break;
+ }
+ rsem.release();
+ while (oids.size() > 0 && oids.element() < oid) {
+ var skipped_id = oids.element();
+ oids.remove();
+ var skipped = ops.get(skipped_id);
+ switch (skipped.status) {
+ case SEEN:
+ case SKIPPED:
+ case WRITTEN:
+ violation(
+ partition, "unexpected status for skipped record: "
+ + skipped.status.name()
+ + " oid:" + skipped.id);
+ break;
+ case UNKNOWN:
+ case WRITING:
+ op.status = OpStatus.SKIPPED;
+ break;
+ }
+ ops.remove(skipped_id);
+ }
+ if (oids.size() == 0) {
+ violation(partition, "len of pending reads can't be 0");
+ }
+ if (oids.element() != oid) {
+ violation(
+ partition,
+ "unexpected element in pending reads oid:" + oids.element());
+ }
+ oids.remove();
+ ops.remove(oid);
+ stat.countRead++;
+ }
+ }
+ }
+ }
+
+ private synchronized void log(int partition, String msg) {
+ long ts = System.currentTimeMillis() / 1000 - started_s;
+ System.out.println(
+ "" + workload_id + "\t" + (count++) + "\t" + ts + "\t" + partition
+ + "\t" + msg);
+ }
+
+ private synchronized void violation(int partition, String msg) {
+ long ts = System.currentTimeMillis() / 1000 - started_s;
+ System.out.println(
+ "" + (count++) + "\t" + ts + "\t" + partition + "\tviolation\t" + msg);
+ System.exit(1);
+ }
+
+ synchronized long get_oid() { return ++this.last_oid; }
+
+ class SendCallback implements Callback {
+ private OpInfo op;
+ private WriterStat stat;
+
+ SendCallback(OpInfo op, WriterStat stat) {
+ this.op = op;
+ this.stat = stat;
+ }
+
+ public void onCompletion(RecordMetadata meta, Exception e) {
+ synchronized (Workload.this) {
+ if (e == null) {
+ log(op.partition, "d\t" + op.id + "\t" + meta.offset());
+ switch (op.status) {
+ case SKIPPED:
+ case UNKNOWN:
+ case WRITTEN:
+ violation(
+ op.partition,
+ "just written record can't have status: " + op.status.name());
+ break;
+ case WRITING:
+ op.status = OpStatus.WRITTEN;
+ op.offset = meta.offset();
+ break;
+ case SEEN:
+ if (op.offset != meta.offset()) {
+ violation(
+ op.partition,
+ "oid:" + op.id + " was seen with conflicting offsets; seen: "
+ + op.offset + " written: " + meta.offset());
+ break;
+ }
+ break;
+ }
+
+ stat.countWritten++;
+ stat.lastOffset = Math.max(stat.lastOffset, op.offset);
+ } else {
+ log(op.partition, "e\t" + op.id);
+ System.out.println("=== Error on write");
+ System.out.println(e);
+ e.printStackTrace(System.out);
+ switch (op.status) {
+ case UNKNOWN:
+ Workload.this.violation(
+ op.partition,
+ "impossible situation: failed write can't already have unknown status");
+ break;
+ case WRITING:
+ op.status = OpStatus.UNKNOWN;
+ break;
+ default:
+ break;
+ }
+ }
+ stat.inflights--;
+ if (stat.draining && stat.inflights == 0) {
+ stat.drained.release();
+ }
+ }
+ }
+ }
+}
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/RetryableException.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/RetryableException.java
similarity index 75%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/RetryableException.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/RetryableException.java
index c821ad3e6d53..36f5bd9ae9a1 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/RetryableException.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/RetryableException.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
public class RetryableException extends Exception {
public RetryableException(String msg) { super(msg); }
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/SimpleProducer.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/SimpleProducer.java
similarity index 97%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/SimpleProducer.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/SimpleProducer.java
index b905793a1d90..378b2a0b9648 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/SimpleProducer.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/SimpleProducer.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxConsumer.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxConsumer.java
similarity index 99%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxConsumer.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxConsumer.java
index 02af59170758..5695a95904a9 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxConsumer.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxConsumer.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
import java.lang.Math;
import java.lang.Thread;
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxProducer.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxProducer.java
similarity index 98%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxProducer.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxProducer.java
index 84f0cc87d6b8..c49050d92096 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxProducer.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxProducer.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxReadsWritesTest.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxReadsWritesTest.java
similarity index 99%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxReadsWritesTest.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxReadsWritesTest.java
index c3bf5821647f..db66aab04420 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxReadsWritesTest.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxReadsWritesTest.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
import java.io.*;
import java.lang.Thread;
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxRecord.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxRecord.java
similarity index 90%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxRecord.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxRecord.java
index a3fdbabc69d4..1e120309bc48 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxRecord.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxRecord.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxStream.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxStream.java
similarity index 99%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxStream.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxStream.java
index ec244dd59449..ea6a4f8b2d68 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/TxStream.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/TxStream.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
import java.lang.Math;
import java.util.ArrayList;
diff --git a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/Verifier.java b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/Verifier.java
similarity index 99%
rename from tests/java/tx-verifier/src/main/java/io/vectorized/kafka/Verifier.java
rename to tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/Verifier.java
index ed8be2f51f6d..88d4d7be405d 100644
--- a/tests/java/tx-verifier/src/main/java/io/vectorized/kafka/Verifier.java
+++ b/tests/java/verifiers/src/main/java/io/vectorized/tx_verifier/Verifier.java
@@ -1,4 +1,4 @@
-package io.vectorized.kafka;
+package io.vectorized.tx_verifier;
import static java.util.Map.entry;
diff --git a/tests/rptest/remote_scripts/control/alive.sh b/tests/rptest/remote_scripts/control/alive.sh
new file mode 100755
index 000000000000..c1cc090dda59
--- /dev/null
+++ b/tests/rptest/remote_scripts/control/alive.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+set -e
+
+if [ ! -f /opt/remote/var/$1.pid ]; then
+ echo "NO"
+ exit 0
+fi
+
+pid=$(cat /opt/remote/var/$1.pid)
+
+if [ $pid == "" ]; then
+ echo "NO"
+ exit 0
+fi
+
+if process=$(ps -p $pid -o comm=); then
+ echo "YES"
+ exit 0
+fi
+
+echo "NO"
+exit 0
diff --git a/tests/rptest/remote_scripts/control/start.sh b/tests/rptest/remote_scripts/control/start.sh
new file mode 100755
index 000000000000..487ed2e65c2a
--- /dev/null
+++ b/tests/rptest/remote_scripts/control/start.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+set -e
+
+mkdir -p /opt/remote/var
+cd /opt/remote/var
+nohup bash -c "$2" >/opt/remote/var/$1.log 2>&1 &
+echo $! >/opt/remote/var/$1.pid
diff --git a/tests/rptest/remote_scripts/control/stop.sh b/tests/rptest/remote_scripts/control/stop.sh
new file mode 100755
index 000000000000..9c8bd6dd4f95
--- /dev/null
+++ b/tests/rptest/remote_scripts/control/stop.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+
+set -e
+
+if [ ! -f /opt/remote/var/$1.pid ]; then
+ exit 0
+fi
+
+pid=$(cat /opt/remote/var/$1.pid)
+
+if [ $pid == "" ]; then
+ exit 0
+fi
+
+if ps -p $pid; then
+ kill -9 $pid
+fi
+
+rm /opt/remote/var/$1.pid
diff --git a/tests/rptest/services/rw_verifier.py b/tests/rptest/services/rw_verifier.py
new file mode 100644
index 000000000000..732641ad5bbb
--- /dev/null
+++ b/tests/rptest/services/rw_verifier.py
@@ -0,0 +1,212 @@
+# Copyright 2020 Redpanda Data, Inc.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.md
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0
+
+from ducktape.services.service import Service
+import requests
+from time import sleep
+import sys
+import json
+from rptest.util import wait_until
+
+
+class CrushedException(Exception):
+ pass
+
+
+class ConsistencyViolationException(Exception):
+ pass
+
+
+OUTPUT_LOG = "/opt/remote/var/rw.log"
+
+
+class RWVerifier(Service):
+ logs = {"rw_stdout_stderr": {"path": OUTPUT_LOG, "collect_default": True}}
+
+ def __init__(self, context, redpanda):
+ super(RWVerifier, self).__init__(context, num_nodes=1)
+ self._redpanda = redpanda
+ self._is_done = False
+ self._node = None
+
+ def is_alive(self, node):
+ result = node.account.ssh_output(
+ "bash /opt/remote/control/alive.sh rw")
+ result = result.decode("utf-8")
+ return "YES" in result
+
+ def is_ready(self):
+ try:
+ self.remote_ping()
+ return True
+ except requests.exceptions.ConnectionError:
+ return False
+
+ ### Service overrides
+
+ def start_node(self, node, timeout_sec=10):
+ node.account.ssh(
+ "bash /opt/remote/control/start.sh rw \"java -cp /opt/verifiers/verifiers.jar io.vectorized.reads_writes.App\""
+ )
+ sleep(1)
+ wait_until(
+ lambda: self.is_alive(node),
+ timeout_sec=timeout_sec,
+ backoff_sec=1,
+ err_msg=
+ f"rw service {node.account.hostname} failed to start within {timeout_sec} sec",
+ retry_on_exc=False)
+ self._node = node
+ wait_until(
+ lambda: self.is_ready(),
+ timeout_sec=timeout_sec,
+ backoff_sec=1,
+ err_msg=
+ f"rw service {node.account.hostname} failed to become ready within {timeout_sec} sec",
+ retry_on_exc=False)
+
+ def stop_node(self, node):
+ node.account.ssh("bash /opt/remote/control/stop.sh rw")
+ self.raise_on_violation(node)
+
+ def clean_node(self, node):
+ pass
+
+ def wait_node(self, node, timeout_sec=sys.maxsize):
+ wait_until(
+ lambda: not (self.is_alive(node)),
+ timeout_sec=timeout_sec,
+ backoff_sec=1,
+ err_msg=
+ f"rw service {node.account.hostname} failed to stop within {timeout_sec} sec",
+ retry_on_exc=False)
+ return True
+
+ #########################################
+
+ def remote_ping(self):
+ ip = self._node.account.hostname
+ r = requests.get(f"http://{ip}:8080/ping")
+ if r.status_code != 200:
+ raise Exception(f"unexpected status code: {r.status_code}")
+
+ def remote_start(self,
+ name,
+ connection,
+ topic,
+ partitions,
+ read_write_loop_slack=1000):
+ ip = self._node.account.hostname
+ r = requests.post(f"http://{ip}:8080/start",
+ json={
+ "name": name,
+ "brokers": connection,
+ "topic": topic,
+ "partitions": partitions,
+ "read_write_loop_slack": read_write_loop_slack
+ })
+ if r.status_code != 200:
+ raise Exception(
+ f"unexpected status code: {r.status_code} content: {r.content}"
+ )
+
+ def remote_stop(self, name):
+ ip = self._node.account.hostname
+ r = requests.post(f"http://{ip}:8080/stop?name={name}")
+ if r.status_code != 200:
+ raise Exception(
+ f"unexpected status code: {r.status_code} content: {r.content}"
+ )
+
+ def remote_wait(self, name):
+ ip = self._node.account.hostname
+ r = requests.get(f"http://{ip}:8080/wait?name={name}")
+ if r.status_code != 200:
+ raise Exception(
+ f"unexpected status code: {r.status_code} content: {r.content}"
+ )
+
+ def remote_info(self, name):
+ ip = self._node.account.hostname
+ url = f"http://{ip}:8080/info?name={name}"
+ self._redpanda.logger.debug(f"Dispatching {url}")
+ r = requests.get(url)
+ if r.status_code != 200:
+ raise Exception(
+ f"unexpected status code: {r.status_code} content: {r.content}"
+ )
+ self._redpanda.logger.debug(f"Received {json.dumps(r.json())}")
+ return r.json()
+
+ def ensure_progress(self, name, delta, timeout_sec):
+ if not self.is_alive(self._node):
+ self.raise_on_violation(self._node)
+ raise CrushedException(
+ "rw_verifier is crushed (it often happens on the consistency violation)"
+ )
+
+ old_state = self.remote_info(name)
+ min_reads = old_state["min_reads"]
+ min_writes = old_state["min_writes"]
+
+ def check_writes():
+ new_state = self.remote_info(name)
+ if new_state["min_writes"] - min_writes < delta:
+ return False
+ return True
+
+ def check_reads():
+ new_state = self.remote_info(name)
+ if new_state["min_reads"] - min_reads < delta:
+ return False
+ return True
+
+ wait_until(
+ check_writes,
+ timeout_sec,
+ 2,
+ err_msg=
+ f"writes got stuck: hasn't written {delta} records in {timeout_sec}s"
+ )
+ wait_until(
+ check_reads,
+ timeout_sec,
+ 2,
+ err_msg=
+ f"reads got stuck: hasn't read {delta} records in {timeout_sec}s")
+
+ def has_cleared(self, name, threshold, timeout_sec):
+ if not self.is_alive(self._node):
+ self.raise_on_violation(self._node)
+ raise CrushedException(
+ "rw_verifier is crushed (it often happens on the consistency violation)"
+ )
+
+ def check():
+ new_state = self.remote_info(name)
+ if new_state["min_reads"] < threshold:
+ return False
+ if new_state["min_writes"] < threshold:
+ return False
+ return True
+
+ wait_until(
+ check,
+ timeout_sec,
+ 2,
+ err_msg=
+ f"num of rw iteration hasn't reached {threshold} in {timeout_sec}")
+
+ def raise_on_violation(self, node):
+ self.logger.info(
+ f"Scanning node {node.account.hostname} log for violations...")
+
+ for line in node.account.ssh_capture(
+ f"grep -e violation {OUTPUT_LOG} || true"):
+ raise ConsistencyViolationException(line)
diff --git a/tests/rptest/tests/end_to_end.py b/tests/rptest/tests/end_to_end.py
index 549af117f93a..8ac8d399460b 100644
--- a/tests/rptest/tests/end_to_end.py
+++ b/tests/rptest/tests/end_to_end.py
@@ -28,11 +28,12 @@
from rptest.services.redpanda_installer import InstallOptions
from rptest.services.redpanda_installer import RedpandaInstaller
from rptest.clients.default import DefaultClient
+from rptest.clients.rpk import RpkTool
+from rptest.services.admin import Admin
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.services.verifiable_producer import VerifiableProducer, is_int_with_prefix
from rptest.archival.s3_client import S3Client
from rptest.clients.rpk import RpkTool
-from rptest.clients.rpk import RpkException
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
@@ -134,9 +135,19 @@ def start_redpanda(self,
RedpandaInstaller.HEAD)
self.redpanda.restart_nodes(nodes_to_upgrade)
+ self._admin_client = Admin(self.redpanda)
self._client = DefaultClient(self.redpanda)
self._rpk_client = RpkTool(self.redpanda)
+ def await_controller(self, check=lambda node_id: True, hosts=None):
+ admin = self.admin_client()
+ return admin.await_stable_leader("controller",
+ partition=0,
+ namespace="redpanda",
+ timeout_s=30,
+ check=check,
+ hosts=hosts)
+
def rpk_client(self):
assert self._rpk_client is not None
return self._rpk_client
@@ -145,6 +156,14 @@ def client(self):
assert self._client is not None
return self._client
+ def rpk_client(self):
+ assert self._rpk_client is not None
+ return self._rpk_client
+
+ def admin_client(self):
+ assert self._admin_client is not None
+ return self._admin_client
+
@property
def debug_mode(self):
"""
diff --git a/tests/rptest/tests/partition_movement.py b/tests/rptest/tests/partition_movement.py
index fe198a99d844..f92b027b7a83 100644
--- a/tests/rptest/tests/partition_movement.py
+++ b/tests/rptest/tests/partition_movement.py
@@ -106,6 +106,14 @@ def keep(p):
def _wait_post_move(self, topic, partition, assignments, timeout_sec):
admin = Admin(self.redpanda)
+ def derived_done():
+ info = self._get_current_partitions(admin, topic, partition)
+ self.logger.info(
+ f"derived assignments for {topic}-{partition}: {info}")
+ return self._equal_assignments(info, assignments)
+
+ wait_until(derived_done, timeout_sec=timeout_sec, backoff_sec=1)
+
def status_done():
results = []
for n in self.redpanda._started:
@@ -121,17 +129,7 @@ def status_done():
# wait until redpanda reports complete
wait_until(status_done, timeout_sec=timeout_sec, backoff_sec=2)
- def derived_done():
- info = self._get_current_partitions(admin, topic, partition)
- self.logger.info(
- f"derived assignments for {topic}-{partition}: {info}")
- return self._equal_assignments(info, assignments)
-
- wait_until(derived_done, timeout_sec=timeout_sec, backoff_sec=2)
-
def _do_move_and_verify(self, topic, partition, timeout_sec):
- admin = Admin(self.redpanda)
-
_, new_assignment = self._dispatch_random_partition_move(
topic=topic, partition=partition)
diff --git a/tests/rptest/tests/partition_movement_test.py b/tests/rptest/tests/partition_movement_test.py
index 116d8512c92b..497bc783b4a7 100644
--- a/tests/rptest/tests/partition_movement_test.py
+++ b/tests/rptest/tests/partition_movement_test.py
@@ -13,22 +13,19 @@
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
-from rptest.clients.kafka_cat import KafkaCat
from ducktape.mark import matrix
+from rptest.services.rw_verifier import RWVerifier
from rptest.utils.mode_checks import skip_debug_mode
from rptest.clients.types import TopicSpec
-from rptest.clients.rpk import RpkTool
from rptest.tests.end_to_end import EndToEndTest
from rptest.services.admin import Admin
from rptest.services.redpanda_installer import InstallOptions, RedpandaInstaller
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.util import wait_until_result
from rptest.services.honey_badger import HoneyBadger
-from rptest.services.rpk_producer import RpkProducer
-from rptest.services.kaf_producer import KafProducer
-from rptest.services.rpk_consumer import RpkConsumer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, SISettings
+from ducktape.errors import TimeoutError
# Errors we should tolerate when moving partitions around
PARTITION_MOVEMENT_LOG_ERRORS = [
@@ -84,19 +81,18 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade):
hb.set_exception(n, 'raftgen_service::failure_probes', 'vote')
topic = "topic-1"
partition = 0
- spec = TopicSpec(name=topic, partition_count=1, replication_factor=3)
- self.client().create_topic(spec)
- admin = Admin(self.redpanda)
+ self.rpk_client().create_topic(topic, 1, 3)
# choose a random topic-partition
self.logger.info(f"selected topic-partition: {topic}-{partition}")
# get the partition's replica set, including core assignments. the kafka
# api doesn't expose core information, so we use the redpanda admin api.
- assignments = self._get_assignments(admin, topic, partition)
+ assignments = self._get_assignments(self._admin_client, topic,
+ partition)
self.logger.info(f"assignments for {topic}-{partition}: {assignments}")
- brokers = admin.get_brokers()
+ brokers = self._admin_client.get_brokers()
# replace all node cores in assignment
for assignment in assignments:
for broker in brokers:
@@ -106,10 +102,11 @@ def test_moving_not_fully_initialized_partition(self, num_to_upgrade):
self.logger.info(
f"new assignments for {topic}-{partition}: {assignments}")
- r = admin.set_partition_replicas(topic, partition, assignments)
+ self._admin_client.set_partition_replicas(topic, partition,
+ assignments)
def status_done():
- info = admin.get_partitions(topic, partition)
+ info = self._admin_client.get_partitions(topic, partition)
self.logger.info(
f"current assignments for {topic}-{partition}: {info}")
converged = self._equal_assignments(info["replicas"], assignments)
@@ -122,7 +119,8 @@ def status_done():
wait_until(status_done, timeout_sec=60, backoff_sec=2)
def derived_done():
- info = self._get_current_partitions(admin, topic, partition)
+ info = self._get_current_partitions(self._admin_client, topic,
+ partition)
self.logger.info(
f"derived assignments for {topic}-{partition}: {info}")
return self._equal_assignments(info, assignments)
@@ -144,104 +142,13 @@ def test_empty(self, num_to_upgrade):
topics = []
for partition_count in range(1, 5):
for replication_factor in (1, 3):
- name = f"topic{len(topics)}"
- spec = TopicSpec(name=name,
- partition_count=partition_count,
- replication_factor=replication_factor)
- topics.append(spec)
-
- for spec in topics:
- self.client().create_topic(spec)
-
- for _ in range(25):
- self._move_and_verify()
-
- @cluster(num_nodes=4,
- log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
- PREV_VERSION_LOG_ALLOW_LIST)
- @matrix(num_to_upgrade=[0, 2])
- def test_static(self, num_to_upgrade):
- """
- Move partitions with data, but no active producers or consumers.
- """
- self.logger.info(f"Starting redpanda...")
- test_mixed_versions = num_to_upgrade > 0
- install_opts = InstallOptions(
- install_previous_version=test_mixed_versions,
- num_to_upgrade=num_to_upgrade)
- self.start_redpanda(num_nodes=3, install_opts=install_opts)
-
- topics = []
- for partition_count in range(1, 5):
- for replication_factor in (1, 3):
- name = f"topic{len(topics)}"
- spec = TopicSpec(name=name,
- partition_count=partition_count,
- replication_factor=replication_factor)
- topics.append(spec)
-
- self.logger.info(f"Creating topics...")
- for spec in topics:
- self.client().create_topic(spec)
-
- num_records = 1000
- produced = set(
- ((f"key-{i:08d}", f"record-{i:08d}") for i in range(num_records)))
-
- for spec in topics:
- self.logger.info(f"Producing to {spec}")
- producer = KafProducer(self.test_context, self.redpanda, spec.name,
- num_records)
- producer.start()
- self.logger.info(
- f"Finished producing to {spec}, waiting for producer...")
- producer.wait()
- producer.free()
- self.logger.info(f"Producer stop complete.")
+ name = f"topic_{partition_count}_{replication_factor}"
+ self.rpk_client().create_topic(name, partition_count,
+ replication_factor)
for _ in range(25):
self._move_and_verify()
- for spec in topics:
- self.logger.info(f"Verifying records in {spec}")
-
- consumer = RpkConsumer(self.test_context,
- self.redpanda,
- spec.name,
- ignore_errors=False,
- retries=0)
- consumer.start()
- timeout = 30
- t1 = time.time()
- consumed = set()
- while consumed != produced:
- if time.time() > t1 + timeout:
- self.logger.error(
- f"Validation failed for topic {spec.name}. Produced {len(produced)}, consumed {len(consumed)}"
- )
- self.logger.error(
- f"Messages consumed but not produced: {sorted(consumed - produced)}"
- )
- self.logger.error(
- f"Messages produced but not consumed: {sorted(produced - consumed)}"
- )
- assert set(consumed) == produced
- else:
- time.sleep(5)
- for m in consumer.messages:
- self.logger.info(f"message: {m}")
- consumed = set([(m['key'], m['value'])
- for m in consumer.messages])
-
- self.logger.info(f"Stopping consumer...")
- consumer.stop()
- self.logger.info(f"Awaiting consumer...")
- consumer.wait()
- self.logger.info(f"Freeing consumer...")
- consumer.free()
-
- self.logger.info(f"Finished verifying records in {spec}")
-
def _get_scale_params(self):
"""
Helper for reducing traffic generation parameters
@@ -253,7 +160,7 @@ def _get_scale_params(self):
return throughput, records, moves
- @cluster(num_nodes=5,
+ @cluster(num_nodes=4,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
PREV_VERSION_LOG_ALLOW_LIST)
@matrix(num_to_upgrade=[0, 2])
@@ -261,7 +168,7 @@ def test_dynamic(self, num_to_upgrade):
"""
Move partitions with active consumer / producer
"""
- throughput, records, moves = self._get_scale_params()
+ _, records, moves = self._get_scale_params()
test_mixed_versions = num_to_upgrade > 0
install_opts = InstallOptions(
@@ -270,18 +177,27 @@ def test_dynamic(self, num_to_upgrade):
self.start_redpanda(num_nodes=3,
install_opts=install_opts,
extra_rp_conf={"default_topic_replications": 3})
- spec = TopicSpec(name="topic", partition_count=3, replication_factor=3)
- self.client().create_topic(spec)
- self.topic = spec.name
+ topic = "topic"
+ self.rpk_client().create_topic(topic, 3, 3)
- self.start_producer(1, throughput=throughput)
- self.start_consumer(1)
- self.await_startup()
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ rw_verifier.remote_start(topic, self.redpanda.brokers(), topic, 3)
+ self.logger.info(f"waiting for 100 r&w operations to warm up")
+ rw_verifier.ensure_progress(topic, 100, 10)
+
+ self.logger.info(f"initiate movement")
for _ in range(moves):
self._move_and_verify()
- self.run_validation(enable_idempotence=False,
- consumer_timeout_sec=45,
- min_records=records)
+
+ self.logger.info(f"waiting for 100 r&w operations")
+ rw_verifier.ensure_progress(topic, 100, 10)
+ self.logger.info(f"checkin if cleared {records} operations")
+ rw_verifier.has_cleared(topic, records, 45)
+
+ self.logger.info(f"stopping the workload")
+ rw_verifier.remote_stop(topic)
+ rw_verifier.remote_wait(topic)
@cluster(num_nodes=5,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
@@ -314,23 +230,24 @@ def test_move_consumer_offsets_intranode(self, num_to_upgrade):
self.start_consumer(1)
self.await_startup()
- admin = Admin(self.redpanda)
topic = "__consumer_offsets"
partition = 0
for _ in range(moves):
- assignments = self._get_assignments(admin, topic, partition)
+ assignments = self._get_assignments(self.admin_client(), topic,
+ partition)
for a in assignments:
# Bounce between core 0 and 1
a['core'] = (a['core'] + 1) % 2
- admin.set_partition_replicas(topic, partition, assignments)
+ self.admin_client().set_partition_replicas(topic, partition,
+ assignments)
self._wait_post_move(topic, partition, assignments, 360)
self.run_validation(enable_idempotence=False,
consumer_timeout_sec=45,
min_records=records)
- @cluster(num_nodes=5,
+ @cluster(num_nodes=4,
log_allow_list=PARTITION_MOVEMENT_LOG_ERRORS +
RESTART_LOG_ALLOW_LIST + PREV_VERSION_LOG_ALLOW_LIST)
@matrix(num_to_upgrade=[0, 2])
@@ -343,23 +260,25 @@ def test_bootstrapping_after_move(self, num_to_upgrade):
install_previous_version=test_mixed_versions,
num_to_upgrade=num_to_upgrade)
self.start_redpanda(num_nodes=3, install_opts=install_opts)
- spec = TopicSpec(name="topic", partition_count=3, replication_factor=3)
- self.client().create_topic(spec)
- self.topic = spec.name
- self.start_producer(1)
- self.start_consumer(1)
- self.await_startup()
+ topic = "topic"
+ self.rpk_client().create_topic(topic, 3, 3)
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ rw_verifier.remote_start(topic, self.redpanda.brokers(), topic, 3)
+ self.logger.info(f"waiting for 100 r&w operations to warm up")
# execute single move
self._move_and_verify()
- self.run_validation(enable_idempotence=False, consumer_timeout_sec=45)
-
- # snapshot offsets
- rpk = RpkTool(self.redpanda)
+ self.logger.info(f"waiting for 100 r&w operations")
+ rw_verifier.ensure_progress(topic, 100, 10)
+ self.logger.info(f"checkin if cleared 1000 operations")
+ rw_verifier.has_cleared(topic, 1000, 45)
+ rw_verifier.remote_stop(topic)
+ rw_verifier.remote_wait(topic)
def has_offsets_for_all_partitions():
# NOTE: partitions may not be returned if their fields can't be
# populated, e.g. during leadership changes.
- partitions = list(rpk.describe_topic(spec.name))
+ partitions = list(self.rpk_client().describe_topic(topic))
if len(partitions) == 3:
return (True, partitions)
return (False, None)
@@ -374,7 +293,7 @@ def has_offsets_for_all_partitions():
self.redpanda.restart_nodes(self.redpanda.nodes)
def offsets_are_recovered():
- partitions_after = list(rpk.describe_topic(spec.name))
+ partitions_after = list(self.rpk_client().describe_topic(topic))
if len(partitions_after) != 3:
return False
return all([
@@ -395,14 +314,13 @@ def test_invalid_destination(self, num_to_upgrade):
install_previous_version=test_mixed_versions,
num_to_upgrade=num_to_upgrade)
self.start_redpanda(num_nodes=3, install_opts=install_opts)
- spec = TopicSpec(name="topic", partition_count=1, replication_factor=1)
- self.client().create_topic(spec)
- topic = spec.name
+ topic = "topic"
+ self.rpk_client().create_topic(topic, 1, 1)
partition = 0
- admin = Admin(self.redpanda)
- brokers = admin.get_brokers()
- assignments = self._get_assignments(admin, topic, partition)
+ brokers = self.admin_client().get_brokers()
+ assignments = self._get_assignments(self.admin_client(), topic,
+ partition)
# Pick a node id where the topic currently isn't allocated
valid_dest = list(
@@ -417,7 +335,8 @@ def test_invalid_destination(self, num_to_upgrade):
# A valid node but an invalid core
assignments = [{"node_id": valid_dest, "core": invalid_shard}]
try:
- r = admin.set_partition_replicas(topic, partition, assignments)
+ r = self.admin_client().set_partition_replicas(
+ topic, partition, assignments)
except requests.exceptions.HTTPError as e:
assert e.response.status_code == 400
else:
@@ -426,7 +345,8 @@ def test_invalid_destination(self, num_to_upgrade):
# An invalid node but a valid core
assignments = [{"node_id": invalid_dest, "core": 0}]
try:
- r = admin.set_partition_replicas(topic, partition, assignments)
+ r = self.admin_client().set_partition_replicas(
+ topic, partition, assignments)
except requests.exceptions.HTTPError as e:
assert e.response.status_code == 400
else:
@@ -436,7 +356,8 @@ def test_invalid_destination(self, num_to_upgrade):
# Reproducer for https://github.com/redpanda-data/redpanda/issues/2286
assignments = [{"node_id": valid_dest, "core": 3.14}]
try:
- r = admin.set_partition_replicas(topic, partition, assignments)
+ r = self.admin_client().set_partition_replicas(
+ topic, partition, assignments)
except requests.exceptions.HTTPError as e:
assert e.response.status_code == 400
else:
@@ -444,7 +365,8 @@ def test_invalid_destination(self, num_to_upgrade):
assignments = [{"node_id": 3.14, "core": 0}]
try:
- r = admin.set_partition_replicas(topic, partition, assignments)
+ r = self.admin_client().set_partition_replicas(
+ topic, partition, assignments)
except requests.exceptions.HTTPError as e:
assert e.response.status_code == 400
else:
@@ -459,7 +381,8 @@ def test_invalid_destination(self, num_to_upgrade):
"core": 0
}]
try:
- r = admin.set_partition_replicas(topic, partition, assignments)
+ r = self.admin_client().set_partition_replicas(
+ topic, partition, assignments)
except requests.exceptions.HTTPError as e:
assert e.response.status_code == 400
else:
@@ -467,7 +390,8 @@ def test_invalid_destination(self, num_to_upgrade):
# Finally a valid move
assignments = [{"node_id": valid_dest, "core": 0}]
- r = admin.set_partition_replicas(topic, partition, assignments)
+ r = self.admin_client().set_partition_replicas(topic, partition,
+ assignments)
assert r.status_code == 200
@cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@@ -488,44 +412,6 @@ def test_overlapping_changes(self, num_to_upgrade):
install_previous_version=test_mixed_versions)
self.start_redpanda(num_nodes=4, install_opts=install_opts)
- node_ids = {1, 2, 3, 4}
-
- # Create topic with enough data that inter-node movement
- # will take a while.
- name = f"movetest"
- spec = TopicSpec(name=name, partition_count=1, replication_factor=3)
- self.client().create_topic(spec)
-
- # Wait for the partition to have a leader (`rpk produce` errors
- # out if it tries to write data before this)
- def partition_ready():
- return KafkaCat(self.redpanda).get_partition_leader(
- name, 0)[0] is not None
-
- wait_until(partition_ready, timeout_sec=10, backoff_sec=0.5)
-
- # Write a substantial amount of data to the topic
- msg_size = 512 * 1024
- write_bytes = 512 * 1024 * 1024
- producer = RpkProducer(self._ctx,
- self.redpanda,
- name,
- msg_size=msg_size,
- msg_count=int(write_bytes / msg_size))
- t1 = time.time()
- producer.start()
-
- # This is an absurdly low expected throughput, but necessarily
- # so to run reliably on current test runners, which share an EBS
- # backend among many parallel tests. 10MB/s has been empirically
- # shown to be too high an expectation.
- expect_bps = 1 * 1024 * 1024
- expect_runtime = write_bytes / expect_bps
- producer.wait(timeout_sec=expect_runtime)
-
- self.logger.info(
- f"Write complete {write_bytes} in {time.time() - t1} seconds")
-
# - Admin API redirects writes but not reads. Because we want synchronous
# status after submitting operations, send all operations to the controller
# leader. This is not necessary for operations to work, just to simplify
@@ -533,9 +419,37 @@ def partition_ready():
# - Because we will later verify that a 503 is sent in response to
# a move request to an in_progress topic, set retry_codes=[] to
# disable default retries on 503.
- admin_node = self.redpanda.controller()
+ admin_node = self.await_controller()
+ admin_node = self.redpanda.get_node(admin_node)
admin = Admin(self.redpanda, default_node=admin_node, retry_codes=[])
+ node_ids = {1, 2, 3, 4}
+
+ # Create topic with enough data that inter-node movement
+ # will take a while.
+ name = f"movetest"
+ partition_count = 1
+ self.rpk_client().create_topic(name, partition_count, 3)
+
+ admin.await_stable_leader(name,
+ partition=0,
+ replication=3,
+ timeout_s=10,
+ backoff_s=0.5)
+
+ msgs = 1024
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ self.logger.info(f"Producing to {name}")
+ t1 = time.time()
+ rw_verifier.remote_start(name, self.redpanda.brokers(), name,
+ partition_count)
+ rw_verifier.ensure_progress(name, msgs, 45)
+ rw_verifier.remote_stop(name)
+ rw_verifier.remote_wait(name)
+ self.logger.info(
+ f"Write complete {msgs} in {time.time() - t1} seconds")
+
# Start an inter-node move, which should take some time
# to complete because of recovery network traffic
assignments = self._get_assignments(admin, name, 0)
@@ -559,15 +473,14 @@ def partition_ready():
# An update to partition properties should succeed
# (issue https://github.com/redpanda-data/redpanda/issues/2300)
- rpk = RpkTool(self.redpanda)
assert admin.get_partitions(name, 0)['status'] == "in_progress"
- rpk.alter_topic_config(name, "retention.ms", "3600000")
+ self.rpk_client().alter_topic_config(name, "retention.ms", "3600000")
# A deletion should succeed
- assert name in rpk.list_topics()
+ assert name in self.rpk_client().list_topics()
assert admin.get_partitions(name, 0)['status'] == "in_progress"
- rpk.delete_topic(name)
- assert name not in rpk.list_topics()
+ self.rpk_client().delete_topic(name)
+ assert name not in self.rpk_client().list_topics()
@cluster(num_nodes=4, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@matrix(num_to_upgrade=[0, 2])
@@ -585,26 +498,28 @@ def test_deletion_stops_move(self, num_to_upgrade):
# create a single topic with replication factor of 1
topic = 'test-topic'
- self.rpk_client().create_topic(topic, 1, 1)
+ partition_count = 1
+ self.rpk_client().create_topic(topic, partition_count, 1)
partition = 0
num_records = 1000
self.logger.info(f"Producing to {topic}")
- producer = KafProducer(self.test_context, self.redpanda, topic,
- num_records)
- producer.start()
- self.logger.info(
- f"Finished producing to {topic}, waiting for producer...")
- producer.wait()
- producer.free()
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ rw_verifier.remote_start(topic, self.redpanda.brokers(), topic,
+ partition_count)
+ rw_verifier.ensure_progress(topic, num_records, 45)
+ rw_verifier.remote_stop(topic)
+ rw_verifier.remote_wait(topic)
+
self.logger.info(f"Producer stop complete.")
- admin = Admin(self.redpanda)
# get current assignments
- assignments = self._get_assignments(admin, topic, partition)
+ assignments = self._get_assignments(self.admin_client(), topic,
+ partition)
assert len(assignments) == 1
self.logger.info(f"assignments for {topic}-{partition}: {assignments}")
- brokers = admin.get_brokers()
+ brokers = self.admin_client().get_brokers()
self.logger.info(f"available brokers: {brokers}")
candidates = list(
filter(lambda b: b['node_id'] != assignments[0]['node_id'],
@@ -622,25 +537,22 @@ def test_deletion_stops_move(self, num_to_upgrade):
alive_hosts = [
n.account.hostname for n in self.redpanda.nodes if n != node
]
- controller_leader = admin.await_stable_leader(
- topic="controller",
- partition=0,
- namespace="redpanda",
+ controller_leader = self.await_controller(
hosts=alive_hosts,
- check=lambda node_id: node_id != self.redpanda.idx(node),
- timeout_s=30)
+ check=lambda node_id: node_id != self.redpanda.idx(node))
controller_leader = self.redpanda.get_node(controller_leader)
- admin.set_partition_replicas(topic,
- partition,
- target_assignment,
- node=controller_leader)
+ self.admin_client().set_partition_replicas(topic,
+ partition,
+ target_assignment,
+ node=controller_leader)
# check that the status is in progress
def get_status():
try:
- partition_info = admin.get_partitions(topic, partition)
+ partition_info = self.admin_client().get_partitions(
+ topic, partition)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
self.logger.info(
@@ -663,34 +575,39 @@ def get_status():
self.rpk_client().create_topic(topic, 1, 1)
wait_until(lambda: get_status() == 'done', 10, 1)
- @cluster(num_nodes=5)
+ @cluster(num_nodes=4)
def test_down_replicate(self):
"""
Test changing replication factor from 3 -> 1
"""
- throughput, records, _ = self._get_scale_params()
+ _, records, _ = self._get_scale_params()
partition_count = 5
self.start_redpanda(num_nodes=3)
- admin = Admin(self.redpanda)
- spec = TopicSpec(partition_count=partition_count, replication_factor=3)
- self.client().create_topic(spec)
- self.topic = spec.name
- self.start_producer(1, throughput=throughput)
- self.start_consumer(1)
- self.await_startup()
+ self.topic = "test-topic"
+ self.rpk_client().create_topic(self.topic, partition_count, 3)
+
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ rw_verifier.remote_start(self.topic, self.redpanda.brokers(),
+ self.topic, partition_count)
+ self.logger.info(f"waiting for 100 r&w operations to warm up")
+ rw_verifier.ensure_progress(self.topic, 100, 10)
for partition in range(0, partition_count):
- assignments = self._get_assignments(admin, self.topic, partition)
+ assignments = self._get_assignments(self.admin_client(),
+ self.topic, partition)
new_assignment = [assignments[0]]
- admin.set_partition_replicas(self.topic, partition, new_assignment)
+ self.admin_client().set_partition_replicas(self.topic, partition,
+ new_assignment)
self._wait_post_move(self.topic, partition, new_assignment, 60)
- self.run_validation(enable_idempotence=False,
- consumer_timeout_sec=45,
- min_records=records)
+ self.logger.info(f"waiting for 100 r&w operations")
+ rw_verifier.ensure_progress(self.topic, 100, 10)
+ self.logger.info(f"checkin if cleared {records} operations")
+ rw_verifier.has_cleared(self.topic, records, 45)
- @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
+ @cluster(num_nodes=5, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_availability_when_one_node_down(self):
"""
Test availability during partition reconfiguration.
@@ -698,20 +615,24 @@ def test_availability_when_one_node_down(self):
The test validates if a partition is available when one of its replicas
is down during reconfiguration.
"""
- throughput, records, _ = self._get_scale_params()
+ _, records, _ = self._get_scale_params()
partition_count = 1
self.start_redpanda(num_nodes=4)
- admin = Admin(self.redpanda)
- spec = TopicSpec(partition_count=partition_count, replication_factor=3)
- self.client().create_topic(spec)
- self.topic = spec.name
+
+ self.topic = "test-topic"
+ self.rpk_client().create_topic(self.topic, partition_count, 3)
+
partition_id = random.randint(0, partition_count - 1)
- self.start_producer(1, throughput=throughput)
- self.start_consumer(1)
- self.await_startup()
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ rw_verifier.remote_start(self.topic, self.redpanda.brokers(),
+ self.topic, partition_count)
+ self.logger.info(f"waiting for 100 r&w operations to warm up")
+ rw_verifier.ensure_progress(self.topic, 100, 10)
- assignments = self._get_assignments(admin, self.topic, partition_id)
+ assignments = self._get_assignments(self.admin_client(), self.topic,
+ partition_id)
self.logger.info(
f"current assignment for {self.topic}/{partition_id}: {assignments}"
)
@@ -719,7 +640,7 @@ def test_availability_when_one_node_down(self):
for a in assignments:
current_replicas.add(a['node_id'])
# replace single replica
- brokers = admin.get_brokers()
+ brokers = self.admin_client().get_brokers()
to_select = [
b for b in brokers if b['node_id'] not in current_replicas
]
@@ -735,9 +656,8 @@ def test_availability_when_one_node_down(self):
self.redpanda.stop_node(self.redpanda.get_node(to_stop))
def new_controller_available():
- controller_id = admin.get_partition_leader(namespace="redpanda",
- topic="controller",
- partition=0)
+ controller_id = self.admin_client().get_partition_leader(
+ namespace="redpanda", topic="controller", partition=0)
self.logger.debug(
f"current controller: {controller_id}, stopped node: {to_stop}"
)
@@ -745,10 +665,11 @@ def new_controller_available():
wait_until(new_controller_available, 30, 1)
# ask partition to move
- admin.set_partition_replicas(self.topic, partition_id, assignments)
+ self.admin_client().set_partition_replicas(self.topic, partition_id,
+ assignments)
def status_done():
- info = admin.get_partitions(self.topic, partition_id)
+ info = self.admin_client().get_partitions(self.topic, partition_id)
self.logger.info(
f"current assignments for {self.topic}/{partition_id}: {info}")
converged = self._equal_assignments(info["replicas"], assignments)
@@ -757,9 +678,10 @@ def status_done():
# wait until redpanda reports complete
wait_until(status_done, timeout_sec=40, backoff_sec=2)
- self.run_validation(enable_idempotence=False,
- consumer_timeout_sec=45,
- min_records=records)
+ self.logger.info(f"waiting for 100 r&w operations")
+ rw_verifier.ensure_progress(self.topic, 100, 10)
+ self.logger.info(f"checkin if cleared {records} operations")
+ rw_verifier.has_cleared(self.topic, records, 45)
class SIPartitionMovementTest(PartitionMovementMixin, EndToEndTest):
@@ -808,7 +730,7 @@ def _partial_upgrade(self, num_to_upgrade: int):
start_timeout=90,
stop_timeout=90)
- @cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
+ @cluster(num_nodes=4, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@matrix(num_to_upgrade=[0, 2])
@skip_debug_mode # rolling restarts require more reliable recovery that a slow debug mode cluster can provide
def test_shadow_indexing(self, num_to_upgrade):
@@ -824,16 +746,20 @@ def test_shadow_indexing(self, num_to_upgrade):
install_opts = InstallOptions(
install_previous_version=test_mixed_versions)
self.start_redpanda(num_nodes=3, install_opts=install_opts)
- installer = self.redpanda._installer
-
- spec = TopicSpec(name="topic",
- partition_count=partitions,
- replication_factor=3)
- self.client().create_topic(spec)
- self.topic = spec.name
- self.start_producer(1, throughput=throughput)
- self.start_consumer(1)
- self.await_startup()
+ self.topic = "topic"
+ self.rpk_client().create_topic(self.topic, partitions, 3)
+
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ rw_verifier.remote_start(self.topic, self.redpanda.brokers(),
+ self.topic, partitions)
+ self.logger.info(f"waiting for 100 r&w operations to warm up")
+ try:
+ rw_verifier.ensure_progress(self.topic, 100, 30)
+ except TimeoutError:
+ rw_verifier.remote_stop(self.topic)
+ rw_verifier.remote_wait(self.topic)
+ raise
# We will start an upgrade halfway through the test: this ensures
# that a single-version cluster existed for long enough to actually
@@ -847,11 +773,15 @@ def test_shadow_indexing(self, num_to_upgrade):
self._move_and_verify()
- self.run_validation(enable_idempotence=False,
- consumer_timeout_sec=45,
- min_records=records)
+ self.logger.info(f"waiting for 100 r&w operations")
+ rw_verifier.ensure_progress(self.topic, 100, 10)
+ self.logger.info(f"checkin if cleared {records} operations")
+ rw_verifier.has_cleared(self.topic, records, 45)
+ self.logger.info(f"stopping the workload")
+ rw_verifier.remote_stop(self.topic)
+ rw_verifier.remote_wait(self.topic)
- @cluster(num_nodes=5, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
+ @cluster(num_nodes=4, log_allow_list=PREV_VERSION_LOG_ALLOW_LIST)
@matrix(num_to_upgrade=[0, 2])
@skip_debug_mode # rolling restarts require more reliable recovery that a slow debug mode cluster can provide
def test_cross_shard(self, num_to_upgrade):
@@ -866,17 +796,21 @@ def test_cross_shard(self, num_to_upgrade):
install_previous_version=test_mixed_versions)
self.start_redpanda(num_nodes=3, install_opts=install_opts)
- spec = TopicSpec(name="topic",
- partition_count=partitions,
- replication_factor=3)
- self.client().create_topic(spec)
- self.topic = spec.name
- self.start_producer(1, throughput=throughput)
- self.start_consumer(1)
- self.await_startup()
+ self.topic = "topic"
+ self.rpk_client().create_topic(self.topic, partitions, 3)
+
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+ rw_verifier.remote_start(self.topic, self.redpanda.brokers(),
+ self.topic, partitions)
+ self.logger.info(f"waiting for 100 r&w operations to warm up")
+ try:
+ rw_verifier.ensure_progress(self.topic, 100, 30)
+ except TimeoutError:
+ rw_verifier.remote_stop(self.topic)
+ rw_verifier.remote_wait(self.topic)
+ raise
- admin = Admin(self.redpanda)
- topic = self.topic
partition = 0
# We will start an upgrade halfway through the test: this ensures
@@ -889,13 +823,18 @@ def test_cross_shard(self, num_to_upgrade):
if i == upgrade_at_step and test_mixed_versions:
self._partial_upgrade(num_to_upgrade)
- assignments = self._get_assignments(admin, topic, partition)
+ admin = self.admin_client()
+ assignments = self._get_assignments(admin, self.topic, partition)
for a in assignments:
# Bounce between core 0 and 1
a['core'] = (a['core'] + 1) % 2
- admin.set_partition_replicas(topic, partition, assignments)
- self._wait_post_move(topic, partition, assignments, 360)
-
- self.run_validation(enable_idempotence=False,
- consumer_timeout_sec=45,
- min_records=records)
+ admin.set_partition_replicas(self.topic, partition, assignments)
+ self._wait_post_move(self.topic, partition, assignments, 360)
+
+ self.logger.info(f"waiting for 100 r&w operations")
+ rw_verifier.ensure_progress(self.topic, 100, 10)
+ self.logger.info(f"checkin if cleared {records} operations")
+ rw_verifier.has_cleared(self.topic, records, 45)
+ self.logger.info(f"stopping the workload")
+ rw_verifier.remote_stop(self.topic)
+ rw_verifier.remote_wait(self.topic)
diff --git a/tests/rptest/tests/rw_test.py b/tests/rptest/tests/rw_test.py
new file mode 100644
index 000000000000..131cad8c7e22
--- /dev/null
+++ b/tests/rptest/tests/rw_test.py
@@ -0,0 +1,57 @@
+# Copyright 2020 Redpanda Data, Inc.
+#
+# Use of this software is governed by the Business Source License
+# included in the file licenses/BSL.md
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0
+
+from rptest.services.cluster import cluster
+from rptest.services.rw_verifier import RWVerifier
+from rptest.util import wait_until
+
+from rptest.tests.redpanda_test import RedpandaTest
+from rptest.clients.rpk import RpkTool
+from rptest.clients.rpk import RpkException
+
+
+class RWVerifierTest(RedpandaTest):
+ def __init__(self, test_context):
+ extra_rp_conf = {
+ "enable_idempotence": True,
+ "id_allocator_replication": 3,
+ "default_topic_replications": 3,
+ "default_topic_partitions": 1,
+ "enable_leader_balancer": False,
+ "enable_auto_rebalance_on_node_add": False
+ }
+
+ super(RWVerifierTest, self).__init__(test_context=test_context,
+ extra_rp_conf=extra_rp_conf)
+
+ @cluster(num_nodes=4)
+ def test_rw(self):
+ rpk = RpkTool(self.redpanda)
+
+ def create_topic():
+ try:
+ rpk.create_topic("topic1", partitions=2)
+ return True
+ except RpkException as e:
+ if "Kafka replied that the controller broker is -1" in str(e):
+ return False
+ raise e
+
+ wait_until(create_topic, 10, 0.1)
+
+ rw_verifier = RWVerifier(self.test_context, self.redpanda)
+ rw_verifier.start()
+
+ rw_verifier.remote_start("topic1", self.redpanda.brokers(), "topic1",
+ 2)
+ self.logger.info(f"Waiting for 1000 r&w operations")
+ rw_verifier.ensure_progress("topic1", 1000, 10)
+ self.logger.info(f"Done")
+ rw_verifier.remote_stop("topic1")
+ rw_verifier.remote_wait("topic1")
diff --git a/tests/rptest/tests/tx_reads_writes_test.py b/tests/rptest/tests/tx_reads_writes_test.py
index bfd600d915d4..a205157035ab 100644
--- a/tests/rptest/tests/tx_reads_writes_test.py
+++ b/tests/rptest/tests/tx_reads_writes_test.py
@@ -33,7 +33,7 @@ def __init__(self, test_context):
@cluster(num_nodes=3)
def test_reads_writes(self):
- verifier_jar = "/opt/tx-verifier/tx-verifier.jar"
+ verifier_jar = "/opt/verifiers/verifiers.jar"
self.redpanda.logger.info("creating topics")
@@ -43,7 +43,7 @@ def test_reads_writes(self):
test = "concurrent-reads-writes"
try:
- cmd = "{java} -jar {verifier_jar} {test} {brokers}".format(
+ cmd = "{java} -cp {verifier_jar} io.vectorized.tx_verifier.Verifier {test} {brokers}".format(
java="java",
verifier_jar=verifier_jar,
test=test,
diff --git a/tests/rptest/tests/tx_verifier_test.py b/tests/rptest/tests/tx_verifier_test.py
index bd392bf420bf..d8c92e2d27da 100644
--- a/tests/rptest/tests/tx_verifier_test.py
+++ b/tests/rptest/tests/tx_verifier_test.py
@@ -39,7 +39,7 @@ def __init__(self, test_context):
extra_rp_conf=extra_rp_conf)
def verify(self, tests):
- verifier_jar = "/opt/tx-verifier/tx-verifier.jar"
+ verifier_jar = "/opt/verifiers/verifiers.jar"
self.redpanda.logger.info("creating topics")
@@ -53,7 +53,7 @@ def verify(self, tests):
self.redpanda.logger.info(
"testing txn test \"{test}\"".format(test=test))
try:
- cmd = "{java} -jar {verifier_jar} {test} {brokers}".format(
+ cmd = "{java} -cp {verifier_jar} io.vectorized.tx_verifier.Verifier {test} {brokers}".format(
java="java",
verifier_jar=verifier_jar,
test=test,