Hi Tobias,

Though not intuitive, this is expected behavior.

First of all, you see just a single operator in the web UI because all tasks are "chained", i.e. combined to execute directly one after another inside a single operator. This is an optimization technique of Flink. Chaining is done whenever the data is not redistributed (shuffled) between two successive operators.

Now, for the metrics of this operator:

"Bytes/Records received" counts how many bytes were received by the Flink operator from an upstream task. There is no upstream task for this chained operator because it reads from Kafka.

"Bytes/Records sent" counts how many bytes were sent downstream from an upstream task. There is no downstream task in case of this chained operator because it writes directly to Solr.

I'm thinking, we might expose a Flink pipeline option to control chaining in Beam. However, users usually want to apply chaining because it is a great optimization technique.

Thanks,
Max

On 05.04.19 12:02, Kaymak, Tobias wrote:
Hello,

I am currently at a hackathon playing with a very simple Beam 2.11 pipeline in Java that reads from Kafka and writes to Solr with no windowing applied (Just mapping the values).

The pipeline works, but the metrics in the Flink web interface always show 0 - which is kind of strange. Any idea what could cause this?

What I find interesting: There is just one node created for the whole operation when the Flink pipeline gets translated from Beam.

         pipeline.apply("Read from Kafka",
            KafkaIO.<String, String>read().withBootstrapServers(bootstrap).withTopics(topics)
                 .withKeyDeserializer(StringDeserializer.class)
                 .withValueDeserializer(StringDeserializer.class)
                 .updateConsumerProperties(consumerProperties)
                 .withReadCommitted()
                 .withTimestampPolicyFactory(withEventTs)
                 .commitOffsetsInFinalize())
          .apply("Convert To Solr Document", ParDo.of(new ToSolrDocument())) .apply(SolrIO.write().to("orders_2").withConnectionConfiguration(conn));
     pipeline.run().waitUntilFinish();

image.png
image.png

Best,
Tobi

Reply via email to