Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Feb 2, 2023
1 parent 0a5b4ac commit ec0282c
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.vectorized.compaction;
import static spark.Spark.*;

import java.util.ArrayList;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.vectorized.compaction.idempotency.IdempotentWorkload;
Expand All @@ -14,7 +16,15 @@ public static class Metrics {
public long total_reads = 0;
public long min_writes = 0;
public long min_reads = 0;
public boolean has_reader = false;

public ArrayList<ConsumerMetrics> consumers = new ArrayList<>();
}

public static class ConsumerMetrics {
public int partition;
public long end_offset = -1;
public long read_offset = -1;
public boolean consumed = false;
}

public static class JsonTransformer implements ResponseTransformer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ static class Partition {
public HashMap<String, LatestValue> latestKV; // hashed by key

public boolean draining = false;
public boolean hasReader = false;
public int inflights = 0;
public long countWritten = 0;
public long countRead = 0;
public CountDownLatch drained = new CountDownLatch(1);

public long endOffset = -1;
public long readOffset = -1;
public boolean consumed = false;

public Partition(int id) {
this.id = id;
this.writeLog = new LinkedList<>();
Expand Down Expand Up @@ -250,25 +253,30 @@ public App.Metrics getMetrics() {
metrics.total_reads = 0;
metrics.min_writes = Long.MAX_VALUE;
metrics.total_writes = 0;
metrics.has_reader = false;
synchronized (this) {
for (int pid = 0; pid < args.partitions; pid++) {
var partition = partitions.get(pid);

metrics.has_reader = metrics.has_reader || partition.hasReader;
metrics.total_reads += partition.countRead;
metrics.min_reads = Math.min(metrics.min_reads, partition.countRead);
metrics.total_writes += partition.countWritten;
metrics.min_writes
= Math.min(metrics.min_writes, partition.countWritten);

var consumer = new App.ConsumerMetrics();
consumer.partition = partition.id;
consumer.end_offset = partition.endOffset;
consumer.read_offset = partition.readOffset;
consumer.consumed = partition.consumed;

metrics.consumers.add(consumer);
}
}
return metrics;
}

private void readProcess(int pid) throws Exception {
var partition = partitions.get(pid);
synchronized (this) { partition.hasReader = true; }

var tp = new TopicPartition(args.topic, pid);
var tps = Collections.singletonList(tp);
Expand All @@ -291,6 +299,9 @@ private void readProcess(int pid) throws Exception {
consumer.assign(tps);
consumer.seekToEnd(tps);
long end = consumer.position(tp);
synchronized (this) {
partition.endOffset = end;
}
consumer.seekToBeginning(tps);

long lastOffset = -1;
Expand All @@ -310,6 +321,9 @@ var record = it.next();
violation(pid, "read offset " + offset + " after " + lastOffset);
}
lastOffset = offset;
synchronized (this) {
partition.readOffset = lastOffset;
}
var parts = record.value().split("\t");
long opId = Long.parseLong(parts[0]);
log(pid, "r\t" + opId + "\t" + offset);
Expand Down Expand Up @@ -365,7 +379,9 @@ var record = it.next();
}
}

synchronized (this) { partition.hasReader = false; }
synchronized (this) {
partition.consumed = true;
}

log(pid, "partition is validated");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -31,10 +29,13 @@ public static class InitBody {
static class Partition {
public int id;

public boolean hasReader = false;
public long countWritten = 0;
public long countRead = 0;

public long endOffset = -1;
public long readOffset = -1;
public boolean consumed = false;

public Partition(int id) { this.id = id; }
}

Expand Down Expand Up @@ -245,25 +246,30 @@ public App.Metrics getMetrics() {
metrics.total_reads = 0;
metrics.min_writes = Long.MAX_VALUE;
metrics.total_writes = 0;
metrics.has_reader = false;
synchronized (this) {
for (int pid = 0; pid < args.partitions; pid++) {
var partition = partitions.get(pid);

metrics.has_reader = metrics.has_reader || partition.hasReader;
metrics.total_reads += partition.countRead;
metrics.min_reads = Math.min(metrics.min_reads, partition.countRead);
metrics.total_writes += partition.countWritten;
metrics.min_writes
= Math.min(metrics.min_writes, partition.countWritten);

var consumer = new App.ConsumerMetrics();
consumer.partition = partition.id;
consumer.end_offset = partition.endOffset;
consumer.read_offset = partition.readOffset;
consumer.consumed = partition.consumed;

metrics.consumers.add(consumer);
}
}
return metrics;
}

private void readProcess(int pid) throws Exception {
var partition = partitions.get(pid);
synchronized (this) { partition.hasReader = true; }

var tp = new TopicPartition(args.topic, pid);
var tps = Collections.singletonList(tp);
Expand All @@ -284,6 +290,9 @@ private void readProcess(int pid) throws Exception {
consumer.assign(tps);
consumer.seekToEnd(tps);
long end = consumer.position(tp);
synchronized (this) {
partition.endOffset = end;
}
consumer.seekToBeginning(tps);

long lastOffset = -1;
Expand All @@ -305,6 +314,10 @@ var record = it.next();
}
lastOffset = offset;

synchronized (this) {
partition.readOffset = lastOffset;
}

var parts = record.value().split("\t");

long opId = Long.parseLong(parts[0]);
Expand Down Expand Up @@ -348,6 +361,10 @@ var record = it.next();
}
consumer.close();

synchronized (this) {
partition.consumed = true;
}

log(pid, "partition is validated");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ static class Partition {
public HashMap<Long, WriteInfo> writes; // hashed by WriteInfo.id
public HashMap<String, LatestValue> latestKV; // hashed by key

public boolean hasReader = false;
public long countWritten = 0;
public long countRead = 0;

public long endOffset = -1;
public long readOffset = -1;
public boolean consumed = false;

public Partition(int id) {
this.id = id;
this.writes = new HashMap<>();
Expand Down Expand Up @@ -301,25 +304,30 @@ public App.Metrics getMetrics() {
metrics.total_reads = 0;
metrics.min_writes = Long.MAX_VALUE;
metrics.total_writes = 0;
metrics.has_reader = false;
synchronized (this) {
for (int pid = 0; pid < args.partitions; pid++) {
var partition = partitions.get(pid);

metrics.has_reader = metrics.has_reader || partition.hasReader;
metrics.total_reads += partition.countRead;
metrics.min_reads = Math.min(metrics.min_reads, partition.countRead);
metrics.total_writes += partition.countWritten;
metrics.min_writes
= Math.min(metrics.min_writes, partition.countWritten);

var consumer = new App.ConsumerMetrics();
consumer.partition = partition.id;
consumer.end_offset = partition.endOffset;
consumer.read_offset = partition.readOffset;
consumer.consumed = partition.consumed;

metrics.consumers.add(consumer);
}
}
return metrics;
}

private void readProcess(int pid) throws Exception {
var partition = partitions.get(pid);
synchronized (this) { partition.hasReader = true; }

var tp = new TopicPartition(args.topic, pid);
var tps = Collections.singletonList(tp);
Expand All @@ -342,6 +350,9 @@ private void readProcess(int pid) throws Exception {
consumer.assign(tps);
consumer.seekToEnd(tps);
long end = consumer.position(tp);
synchronized (this) {
partition.endOffset = end;
}
consumer.seekToBeginning(tps);

long lastOffset = -1;
Expand All @@ -361,6 +372,9 @@ var record = it.next();
violation(pid, "read offset " + offset + " after " + lastOffset);
}
lastOffset = offset;
synchronized (this) {
partition.readOffset = lastOffset;
}
var parts = record.value().split("\t");
long opId = Long.parseLong(parts[0]);
log(pid, "r\t" + opId + "\t" + offset);
Expand Down Expand Up @@ -420,7 +434,9 @@ var record = it.next();
}
}

synchronized (this) { partition.hasReader = false; }
synchronized (this) {
partition.consumed = true;
}

log(pid, "partition is validated");
}
Expand Down
27 changes: 23 additions & 4 deletions tests/rptest/services/compacted_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, context, redpanda, workload):
if workload not in Workload:
raise Exception(f"Unknown workload {workload}")
self._workload = workload
self._partitions = None

def is_alive(self, node):
result = node.account.ssh_output(
Expand Down Expand Up @@ -163,6 +164,7 @@ def remote_start_producer(self,
topic,
partitions,
key_set_cardinality=10):
self._partitions = partitions
ip = self._node.account.hostname
r = requests.post(f"http://{ip}:8080/start-producer",
json={
Expand Down Expand Up @@ -201,7 +203,24 @@ def remote_wait_producer(self):
f"unexpected status code: {r.status_code} content: {r.content}"
)

def remote_wait_consumer(self):
def remote_wait_consumer(self, timeout_sec=30):
def check_consumed():
info = self.remote_info()
if len(info["consumers"]) != self._partitions:
return False
for consumer in info["consumers"]:
if not consumer["consumed"]:
self._redpanda.logger.error(f"Consumption of partition {consumer['partition']} isn't finished")
return False
return True

wait_until(
check_consumed,
timeout_sec,
2,
err_msg=
f"consumers haven't finished in {timeout_sec}s")

ip = self._node.account.hostname
r = requests.get(f"http://{ip}:8080/wait-consumer")
if r.status_code != 200:
Expand All @@ -217,13 +236,13 @@ def _remote_info(self):
)
ip = self._node.account.hostname
url = f"http://{ip}:8080/info"
self._redpanda.logger.debug(f"Dispatching {url}")
self._redpanda.logger.error(f"Dispatching {url}")
r = requests.get(url)
if r.status_code != 200:
raise Exception(
f"unexpected status code: {r.status_code} content: {r.content}"
)
self._redpanda.logger.debug(f"Received {json.dumps(r.json())}")
self._redpanda.logger.error(f"Received {json.dumps(r.json())}")
return r.json()

def remote_info(self):
Expand All @@ -233,7 +252,7 @@ def wrapper():
except CrushedException:
raise
except:
self._redpanda.logger.debug(
self._redpanda.logger.error(
"Got error on fetching info, retrying?!", exc_info=True)
return (False, None)

Expand Down

0 comments on commit ec0282c

Please sign in to comment.