Hi William,

I had the same issue and hacked together a simple KafkaIO. It hasn’t been hard.
Unfortunately right now I’m traveling with limited connection for the next 2 
weeks or so, but here’s some advice.

0. Note that I worked on flink-dataflow, I have no idea how code has evolved 
into beam, but I bet you can figure out the differences.

1. Check out the ConsoleIO (e.g., in flink-dataflow) and TextIO (the latter in 
the dataflow sdk, this has also a method `to`). Based on these, you can write a 
KafkaIO similar to the file in attachment.

1b. Note that this is probably not the recommended/final way that will be used 
to implement generic sinks, but… it’s easy and it works. :)

2. This is the core of your implementation. You need to inform the translator 
on what to do with the new sink. In flink-dataflow this was in 
com.dataartisans.flink.dataflow.translation.FlinkStreamingTransformTranslator.java.
 You need to create a new transformation method (you can mimic the TextIO one), 
register it, and finally implement the logic that writes to Kafka (if you use 
Flink’s Kafka sink, then it’s just a matter of calling their method to write 
data).

3. For an example of how I was using it, see here:
https://github.com/ecesena/oscars2016/blob/master/beam-twitter/src/main/java/com/shopkick/data/dataflow/TwitterDataflow.java#L217

I hope this helps, feel free to write me if you need more information, but as I 
said unfortunately don’t expect me to be super responsive…

Best,

Attachment: KafkaIO.java
Description: Binary data


> On Mar 18, 2016, at 5:23 PM, William McCarthy <[email protected]> 
> wrote:
> 
> Hi,
> 
> I’m trying to write a proof-of-concept which takes messages from Kafka, 
> transforms them using Beam on Flink, then pushes the results onto a different 
> Kafka topic.
> 
> I’ve used the KafkaWindowedWordCountExample as a starting point, and that’s 
> doing the first part of what I want to do, but it outputs to text files as 
> opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t figure 
> out how to plug it into the pipeline. I was thinking that it would be wrapped 
> with an UnboundedFlinkSink, or some such, but that doesn’t seem to exist.
> 
> Any advice or thoughts on what I’m trying to do?
> 
> I’m running the latest incubator-beam (as of last night from Github), Flink 
> 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute Engine (Debian 
> Jessie).
> 
> Thanks,
> 
> Bill McCarthy

Reply via email to