Have a look at https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java .
02.02.2017 1:07 AM "Boris Lublinsky" <boris.lublin...@lightbend.com> napisaĆ(a): > I am trying to write a quick sample of streaming word count using Beam > APIs and FlinkBeamRunner. > The problem that I am getting into is that > > apply("Write to Kafka", Write.to(UnboundedFlinkSink.of(kafkaSink))) > > Does not work in this way - it assumes bounded stream and mine is > unbounded. > > I have not found any unbounded equivalent for Write, So I tried to > implement a custom ParDo function: > > /** > * Write content to Kafka. > * > */ > static class WriteToKafkaFn extends DoFn<Tuple2<String, Integer>, > Tuple2<String, Integer>> { > > private FlinkKafkaProducer09<Tuple2<String, Integer>> kafkaSink; > private boolean opened = false; > > public WriteToKafkaFn(FlinkKafkaProducer09<Tuple2<String, Integer>> > kafkaSink){ > this.kafkaSink = kafkaSink; > } > > @ProcessElement > public void processElement(ProcessContext c) { > if(!opened){ > kafkaSink.open(new Configuration()); > opened = true; > } > Tuple2<String, Integer> record = c.element(); > try { > kafkaSink.invoke(record); > }catch(Throwable t){ > System.out.println("Error writing record " + record + " to > Kafka"); > t.printStackTrace(); > } > } > } > > > > The problem with this approach is that ParDo is not initialized with > Streaming context, that FlinkKafkaConsumer relies upon, so open fails. > > > Any suggestions? >