Hi William, I had the same issue and hacked together a simple KafkaIO. It hasn’t been hard. Unfortunately right now I’m traveling with limited connection for the next 2 weeks or so, but here’s some advice.
0. Note that I worked on flink-dataflow, I have no idea how code has evolved into beam, but I bet you can figure out the differences. 1. Check out the ConsoleIO (e.g., in flink-dataflow) and TextIO (the latter in the dataflow sdk, this has also a method `to`). Based on these, you can write a KafkaIO similar to the file in attachment. 1b. Note that this is probably not the recommended/final way that will be used to implement generic sinks, but… it’s easy and it works. :) 2. This is the core of your implementation. You need to inform the translator on what to do with the new sink. In flink-dataflow this was in com.dataartisans.flink.dataflow.translation.FlinkStreamingTransformTranslator.java. You need to create a new transformation method (you can mimic the TextIO one), register it, and finally implement the logic that writes to Kafka (if you use Flink’s Kafka sink, then it’s just a matter of calling their method to write data). 3. For an example of how I was using it, see here: https://github.com/ecesena/oscars2016/blob/master/beam-twitter/src/main/java/com/shopkick/data/dataflow/TwitterDataflow.java#L217 I hope this helps, feel free to write me if you need more information, but as I said unfortunately don’t expect me to be super responsive… Best,
KafkaIO.java
Description: Binary data
> On Mar 18, 2016, at 5:23 PM, William McCarthy <[email protected]> > wrote: > > Hi, > > I’m trying to write a proof-of-concept which takes messages from Kafka, > transforms them using Beam on Flink, then pushes the results onto a different > Kafka topic. > > I’ve used the KafkaWindowedWordCountExample as a starting point, and that’s > doing the first part of what I want to do, but it outputs to text files as > opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t figure > out how to plug it into the pipeline. I was thinking that it would be wrapped > with an UnboundedFlinkSink, or some such, but that doesn’t seem to exist. > > Any advice or thoughts on what I’m trying to do? > > I’m running the latest incubator-beam (as of last night from Github), Flink > 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute Engine (Debian > Jessie). > > Thanks, > > Bill McCarthy
