Have a look at
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
.

02.02.2017 1:07 AM "Boris Lublinsky" <boris.lublin...@lightbend.com>
napisaƂ(a):

> I am trying to write a quick sample of streaming word count using Beam
> APIs and FlinkBeamRunner.
> The problem that I am getting into is that
>
> apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink)))
>
> Does not work in this way - it assumes bounded stream and mine is
> unbounded.
>
> I have not found any unbounded equivalent for Write, So I tried to
> implement a custom ParDo function:
>
> /**
>  * Write content to Kafka.
>  *
>  */
> static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, 
> Tuple2<String, Integer>> {
>
>     private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink;
>     private boolean opened = false;
>
>     public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> 
> kafkaSink){
>         this.kafkaSink = kafkaSink;
>     }
>
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>         if(!opened){
>             kafkaSink.open(new Configuration());
>             opened = true;
>         }
>         Tuple2<String, Integer> record = c.element();
>         try {
>             kafkaSink.invoke(record);
>         }catch(Throwable t){
>             System.out.println("Error writing record " + record + " to 
> Kafka");
>             t.printStackTrace();
>         }
>     }
> }
>
>
>
> The problem with this approach is that ParDo is not initialized with
> Streaming context, that FlinkKafkaConsumer relies upon, so open fails.
>
>
> Any suggestions?
>

Reply via email to