Skip to content

Commit

Permalink
reformatted the files
Browse files Browse the repository at this point in the history
  • Loading branch information
sumittal committed Nov 14, 2023
1 parent 07e2ea4 commit b311e91
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 40 deletions.
6 changes: 0 additions & 6 deletions tests/rptest/integration-tests/flink_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
from workloads import Workload






from rptest.services.cluster import cluster
from rptest.clients.default import DefaultClient

Expand All @@ -21,7 +16,6 @@
from lib.workload import Workload, \
NumberIncrementalWorkload # RealtimeWordCountWorkload, StreamAggregationWorkload, GeospatialDataProcessingWorkload


# class FlinkTest(RedpandaTest):
# def __init__(self, test_context):
# super(FlinkTest,
Expand Down
29 changes: 19 additions & 10 deletions tests/rptest/integration-tests/lib/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ def __init__(self, topic="test-source-topic"):
self.topic = topic
self.producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda m: json.dumps(m).encode('ascii')
)
value_serializer=lambda m: json.dumps(m).encode('ascii'))

def send_callback(self, num_records, callback):
"""
Expand Down Expand Up @@ -46,9 +45,11 @@ def __init__(self, group_id='test-group'):
'group_id': group_id,
'auto_offset_reset': 'earliest',
}
self.consumer = KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('utf-8')), **conf)
self.consumer = KafkaConsumer(
value_deserializer=lambda m: json.loads(m.decode('utf-8')), **conf)

def consume_fixed_number_of_messages(self, topics, num_messages_to_consume):
def consume_fixed_number_of_messages(self, topics,
num_messages_to_consume):
"""
Consume a fixed number of messages from Kafka topics.
Expand Down Expand Up @@ -135,7 +136,8 @@ class NumberIncrementalWorkload(Workload):
def __init__(self):
super().__init__(
name="Number Incremental Workload with Timestamp",
description="Generates a stream of numbers with incremental values and timestamps."
description=
"Generates a stream of numbers with incremental values and timestamps."
)

def generate_data(self):
Expand Down Expand Up @@ -174,20 +176,26 @@ def verify_data(self, num_records):
sink_consumer = KafkaConsumer('test-sink-topic', **kafka_settings)

print("Reading topics...")
source_topic_records = read_events_and_store(source_consumer, 'test-source-topic', num_records)
source_topic_records = read_events_and_store(source_consumer,
'test-source-topic',
num_records)
import time
time.sleep(5)
sink_topic_records = read_events_and_store(sink_consumer, 'test-sink-topic', num_records)
sink_topic_records = read_events_and_store(sink_consumer,
'test-sink-topic',
num_records)
# print(f"source_topic_records : {source_topic_records}")
# print(f"sink_topic_records : {sink_topic_records}")

errors = []
if len(source_topic_records) == len(sink_topic_records):
for source_key, source_value in source_topic_records.items():
if source_key in sink_topic_records.keys():
if str(source_value["source_sum"]) == str(sink_topic_records[source_key]["sink_sum"]):
if str(source_value["source_sum"]) == str(
sink_topic_records[source_key]["sink_sum"]):
print(
f"Event match: source_sum = {source_value['source_sum']}, sink_sum = {sink_topic_records[source_key]['sink_sum']}")
f"Event match: source_sum = {source_value['source_sum']}, sink_sum = {sink_topic_records[source_key]['sink_sum']}"
)
else:

msg = f"Event mismatch: {source_value['source_sum']}, {sink_topic_records[source_key]['sink_sum']}"
Expand Down Expand Up @@ -245,7 +253,8 @@ def read_events_and_store(consumer, topic, num_records):
prod = TestProducer()

num_events = 8
workload = NumberIncrementalWorkload() # Replace with the desired workload class
workload = NumberIncrementalWorkload(
) # Replace with the desired workload class

prod.send_callback(num_events, workload.generate_data)

Expand Down
58 changes: 34 additions & 24 deletions tests/rptest/services/flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
from pyflink.common.typeinfo import Types
from pyflink.common import Row


# Get the absolute path to the JAR file
# jar_path = "file://" + os.path.abspath("../integration-tests/flink-connector-kafka-3.0.1-1.18.jar")


# change the output type as specified
class SumNumbers(FlatMapFunction):
def flat_map(self, value):
Expand All @@ -35,7 +35,10 @@ def flat_map(self, value):
logging.info(f"Sum:{sum}")

# Return the sum as a string
yield Row(id=events["id"], num_a=str(num1), num_b=str(num2), sink_sum=str(sum))
yield Row(id=events["id"],
num_a=str(num1),
num_b=str(num2),
sink_sum=str(sum))


def flink_job(env):
Expand All @@ -50,37 +53,42 @@ def flink_job(env):
.build()

# Read the data as a stream of strings
data_stream = env.from_source(redpanda_source, WatermarkStrategy.no_watermarks(), "kafka source")
data_stream = env.from_source(redpanda_source,
WatermarkStrategy.no_watermarks(),
"kafka source")
KafkaSource.builder().set_topics("test-source-topic")

# specify output type
sum_stream = data_stream.flat_map(
SumNumbers(),
output_type=Types.ROW_NAMED(field_names=["id", "num_a", "num_b", "sink_sum"],
field_types=[Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]),
output_type=Types.ROW_NAMED(
field_names=["id", "num_a", "num_b", "sink_sum"],
field_types=[
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING()
]),
)

# Define the output type for the sum_stream
output_type = Types.ROW_NAMED(field_names=["id", "num_a", "num_b", "sink_sum"],
field_types=[Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])
output_type = Types.ROW_NAMED(
field_names=["id", "num_a", "num_b", "sink_sum"],
field_types=[
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING()
])

# set value serialization schema
redpanda_sink = (
KafkaSink.builder()
.set_bootstrap_servers("localhost:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("test-sink-topic")
.set_value_serialization_schema(
JsonRowSerializationSchema.builder()
.with_type_info(output_type)
.build()
)
.build()
)
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
)
redpanda_sink = (KafkaSink.builder().set_bootstrap_servers(
"localhost:9092").set_record_serializer(
KafkaRecordSerializationSchema.builder().set_topic(
"test-sink-topic").set_value_serialization_schema(
JsonRowSerializationSchema.builder().with_type_info(
output_type).build()).build()).set_delivery_guarantee(
DeliveryGuarantee.AT_LEAST_ONCE).build())

# Write the transformed data to the output topic
sum_stream.sink_to(redpanda_sink)
Expand All @@ -91,7 +99,9 @@ def flink_job(env):


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(message)s")

# create a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
Expand Down

0 comments on commit b311e91

Please sign in to comment.