Hi,
I’m investigating a slowness of our beam pipelines, and as part of that I tried
to compare a very simple beam pipeline with an equivalent flink-native
pipeline. Both pipelines should read strings from one kafka topic and write
them to another topic. I’m using beam 2.38.0 and flink 1.13.
I tried running each pipeline separately, on a single task manager with a
single slot and parallelism 1. What I saw is that Flink native runs 5 times
faster than beam (150,000 strings per second in flink comparing to 30,000 in
beam).
I’ll be happy if you can help me figure out why there is such a difference.
Maybe the pipelines are not really equivalent, or the beam configuration is
wrong?
Flink native pipeline:
public void process() throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id", KAFKA_GROUP_ID);
FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
FlinkKafkaProducer<String> producer = new
FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new SimpleStringSchema());
DataStream<String> inputMessagesStream =
environment.addSource(consumer);
inputMessagesStream.addSink(producer);
environment.execute();
}
Beam pipeline:
public static void main(String[] args) {
try {
StreamingOptions options =
PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);
options.setStreaming(true);
options.setRunner(FlinkRunner.class);
Pipeline pipeline = Pipeline.create(options);
PTransform<PBegin, PCollection<KV<String, String>>> transform =
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapServers)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates((ImmutableMap.of(
"auto.offset.reset", "earliest",
"group.id", consumerGroup)))
.withoutMetadata();
PCollection<KV<String, String>> input =
pipeline.apply("readFromKafka", transform);
PCollection<ProducerRecord<String, String>> convertedInput =
input.apply("ConvertToStringRecord",
ParDo.of(new ConvertToStringRecord(outputTopic) {}))
.setCoder(new
ProducerRecordCoder<>(StringUtf8Coder.of(), StringUtf8Coder.of()));
KafkaIO.WriteRecords<String, String> writeToAvro = KafkaIO.<String,
String>writeRecords()
.withBootstrapServers(bootstrapServers)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class);
convertedInput.apply("writeToKafka", writeToAvro);
pipeline.run();
} catch (Exception e) {
log.atError().withThrowable(e).log("Exception thrown while running
pipeline PipelineStringToString");
}
}
@Log4j2
@AllArgsConstructor
public class ConvertToStringRecord extends DoFn<KV<String, String>,
ProducerRecord<String, String>> {
private String topic;
private static ProducerRecord<String, String> getRecord(KV<String, String>
message, String topic) {
String string = message.getValue();
ProducerRecord<String, String> pr = new ProducerRecord<>(topic,
message.getKey(), string) {};
pr.headers().add("__TypeId__",
String.class.getName().getBytes(StandardCharsets.UTF_8));
return pr;
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
ProducerRecord pr = getRecord(Objects.requireNonNull(c.element()),
topic);
c.output(pr);
} catch (Exception e) {
log.atError().withThrowable(e).log("exception thrown while
processing string");
}
}
}
Thanks,
Ifat