Hi Tobias,
Though not intuitive, this is expected behavior.
First of all, you see just a single operator in the web UI because all
tasks are "chained", i.e. combined to execute directly one after another
inside a single operator. This is an optimization technique of Flink.
Chaining is done whenever the data is not redistributed (shuffled)
between two successive operators.
Now, for the metrics of this operator:
"Bytes/Records received" counts how many bytes were received by the
Flink operator from an upstream task. There is no upstream task for this
chained operator because it reads from Kafka.
"Bytes/Records sent" counts how many bytes were sent downstream from an
upstream task. There is no downstream task in case of this chained
operator because it writes directly to Solr.
I'm thinking, we might expose a Flink pipeline option to control
chaining in Beam. However, users usually want to apply chaining because
it is a great optimization technique.
Thanks,
Max
On 05.04.19 12:02, Kaymak, Tobias wrote:
Hello,
I am currently at a hackathon playing with a very simple Beam 2.11
pipeline in Java that reads from Kafka and writes to Solr with no
windowing applied (Just mapping the values).
The pipeline works, but the metrics in the Flink web interface always
show 0 - which is kind of strange. Any idea what could cause this?
What I find interesting: There is just one node created for the whole
operation when the Flink pipeline gets translated from Beam.
pipeline.apply("Read from Kafka",
KafkaIO.<String,
String>read().withBootstrapServers(bootstrap).withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(consumerProperties)
.withReadCommitted()
.withTimestampPolicyFactory(withEventTs)
.commitOffsetsInFinalize())
.apply("Convert To Solr Document", ParDo.of(new
ToSolrDocument()))
.apply(SolrIO.write().to("orders_2").withConnectionConfiguration(conn));
pipeline.run().waitUntilFinish();
image.png
image.png
Best,
Tobi