Thanks for the explanation!

On Fri, Apr 5, 2019 at 1:26 PM Maximilian Michels <[email protected]> wrote:

> Hi Tobias,
>
> Though not intuitive, this is expected behavior.
>
> First of all, you see just a single operator in the web UI because all
> tasks are "chained", i.e. combined to execute directly one after another
> inside a single operator. This is an optimization technique of Flink.
> Chaining is done whenever the data is not redistributed (shuffled)
> between two successive operators.
>
> Now, for the metrics of this operator:
>
> "Bytes/Records received" counts how many bytes were received by the
> Flink operator from an upstream task. There is no upstream task for this
> chained operator because it reads from Kafka.
>
> "Bytes/Records sent" counts how many bytes were sent downstream from an
> upstream task. There is no downstream task in case of this chained
> operator because it writes directly to Solr.
>
> I'm thinking, we might expose a Flink pipeline option to control
> chaining in Beam. However, users usually want to apply chaining because
> it is a great optimization technique.
>
> Thanks,
> Max
>
> On 05.04.19 12:02, Kaymak, Tobias wrote:
> > Hello,
> >
> > I am currently at a hackathon playing with a very simple Beam 2.11
> > pipeline in Java that reads from Kafka and writes to Solr with no
> > windowing applied (Just mapping the values).
> >
> > The pipeline works, but the metrics in the Flink web interface always
> > show 0 - which is kind of strange. Any idea what could cause this?
> >
> > What I find interesting: There is just one node created for the whole
> > operation when the Flink pipeline gets translated from Beam.
> >
> >          pipeline.apply("Read from Kafka",
> >              KafkaIO.<String,
> > String>read().withBootstrapServers(bootstrap).withTopics(topics)
> >                  .withKeyDeserializer(StringDeserializer.class)
> >                  .withValueDeserializer(StringDeserializer.class)
> >                  .updateConsumerProperties(consumerProperties)
> >                  .withReadCommitted()
> >                  .withTimestampPolicyFactory(withEventTs)
> >                  .commitOffsetsInFinalize())
> >            .apply("Convert To Solr Document", ParDo.of(new
> > ToSolrDocument()))
> >
> > .apply(SolrIO.write().to("orders_2").withConnectionConfiguration(conn));
> >      pipeline.run().waitUntilFinish();
> >
> > image.png
> > image.png
> >
> > Best,
> > Tobi
>

Reply via email to