Re: Forwarding consumer with kafka streams
Thanks you your help Eno and Guozhang. Indeed I missed the obvious, I made a bad assumption about defaults, should have checked the source code. I thought Kafka Streams was setting AUTO_OFFSET_RESET_CONFIG to "earliest", and it is, but not for the version I'm using! I'm using version 0.10.0.1 which is not touching the AUTO_OFFSET_RESET_CONFIG default (which as you know is "latest"). Comparing to what's in trunk for StreamsConfig seems like a few things changed since version 0.10.0.1. Thanks again guys, really appreciate it. -- Ricardo On Mon, Aug 14, 2017 at 1:15 AM, Guozhang Wang wrote: > Hi Richardo, > > What you described seems very similar to the demo example code as stated > here: > https://github.com/apache/kafka/blob/trunk/streams/ > quickstart/java/src/main/resources/archetype-resources/ > src/main/java/Pipe.java > > If you started the program it should just pipe all data starting from the > earliest offset and pipe it to the target topic, no matter how much data > the source topic already have stored. > > > Guozhang > > > > On Sat, Aug 12, 2017 at 2:35 PM, Eno Thereska > wrote: > > > Hi Ricardo, > > > > Kafka Streams should handle that case as well. What streams config are > you > > using, could you share it? There is one parameter that is called > > “ConsumerConfig.AUTO_OFFSET_RESET_CONFIG” and by default it’s set to > > “earliest”. Any chance your app has changed it to “latest”? > > > > Thanks > > Eno > > > > > On Aug 12, 2017, at 5:13 PM, Ricardo Costa wrote: > > > > > > Hi, > > > > > > I've implemented a forwarding consumer which literally just consumes > the > > > messages from a source topic, logs them and then publishes them to a > > target > > > topic. > > > > > > I wanted to keep the implementation simple with very little code so I > > went > > > with kafka-streams. I have a really simple topology with a source for > the > > > source topic, a sink for the target topic and a logging processor > > > in-between. > > > > > > I'm quite happy with the solution, really simple and elegant, I ran > some > > > basic tests and everything seemed to be working. As I went on to build > > more > > > test cases, I found that the stream only does its thing if I push > > messages > > > to the source topic *after* creating the stream and waiting until it is > > > fully initialized. Is this the expected behaviour? I need the stream to > > be > > > started at any point in time and forward the messages that were > buffered > > on > > > the source topic until then. Are kafka-streams not fit for this use > case? > > > Or am I missing something? > > > > > > Thanks in advance! > > > > > > -- > > > Ricardo > > > > > > > -- > -- Guozhang >
Re: Forwarding consumer with kafka streams
Hi Richardo, What you described seems very similar to the demo example code as stated here: https://github.com/apache/kafka/blob/trunk/streams/quickstart/java/src/main/resources/archetype-resources/src/main/java/Pipe.java If you started the program it should just pipe all data starting from the earliest offset and pipe it to the target topic, no matter how much data the source topic already have stored. Guozhang On Sat, Aug 12, 2017 at 2:35 PM, Eno Thereska wrote: > Hi Ricardo, > > Kafka Streams should handle that case as well. What streams config are you > using, could you share it? There is one parameter that is called > “ConsumerConfig.AUTO_OFFSET_RESET_CONFIG” and by default it’s set to > “earliest”. Any chance your app has changed it to “latest”? > > Thanks > Eno > > > On Aug 12, 2017, at 5:13 PM, Ricardo Costa wrote: > > > > Hi, > > > > I've implemented a forwarding consumer which literally just consumes the > > messages from a source topic, logs them and then publishes them to a > target > > topic. > > > > I wanted to keep the implementation simple with very little code so I > went > > with kafka-streams. I have a really simple topology with a source for the > > source topic, a sink for the target topic and a logging processor > > in-between. > > > > I'm quite happy with the solution, really simple and elegant, I ran some > > basic tests and everything seemed to be working. As I went on to build > more > > test cases, I found that the stream only does its thing if I push > messages > > to the source topic *after* creating the stream and waiting until it is > > fully initialized. Is this the expected behaviour? I need the stream to > be > > started at any point in time and forward the messages that were buffered > on > > the source topic until then. Are kafka-streams not fit for this use case? > > Or am I missing something? > > > > Thanks in advance! > > > > -- > > Ricardo > > -- -- Guozhang
Re: Forwarding consumer with kafka streams
Hi Ricardo, Kafka Streams should handle that case as well. What streams config are you using, could you share it? There is one parameter that is called “ConsumerConfig.AUTO_OFFSET_RESET_CONFIG” and by default it’s set to “earliest”. Any chance your app has changed it to “latest”? Thanks Eno > On Aug 12, 2017, at 5:13 PM, Ricardo Costa wrote: > > Hi, > > I've implemented a forwarding consumer which literally just consumes the > messages from a source topic, logs them and then publishes them to a target > topic. > > I wanted to keep the implementation simple with very little code so I went > with kafka-streams. I have a really simple topology with a source for the > source topic, a sink for the target topic and a logging processor > in-between. > > I'm quite happy with the solution, really simple and elegant, I ran some > basic tests and everything seemed to be working. As I went on to build more > test cases, I found that the stream only does its thing if I push messages > to the source topic *after* creating the stream and waiting until it is > fully initialized. Is this the expected behaviour? I need the stream to be > started at any point in time and forward the messages that were buffered on > the source topic until then. Are kafka-streams not fit for this use case? > Or am I missing something? > > Thanks in advance! > > -- > Ricardo
Forwarding consumer with kafka streams
Hi, I've implemented a forwarding consumer which literally just consumes the messages from a source topic, logs them and then publishes them to a target topic. I wanted to keep the implementation simple with very little code so I went with kafka-streams. I have a really simple topology with a source for the source topic, a sink for the target topic and a logging processor in-between. I'm quite happy with the solution, really simple and elegant, I ran some basic tests and everything seemed to be working. As I went on to build more test cases, I found that the stream only does its thing if I push messages to the source topic *after* creating the stream and waiting until it is fully initialized. Is this the expected behaviour? I need the stream to be started at any point in time and forward the messages that were buffered on the source topic until then. Are kafka-streams not fit for this use case? Or am I missing something? Thanks in advance! -- Ricardo