On 2018/09/06 00:32:14, "Matthias J. Sax" <matth...@confluent.io> wrote:
> 1. There is not API for this
>
> 2. I guess it might be possible, but might not be the best way to do it.
>
> 3. That is also not possible.
>
>
> I would recommend something like this:
>
>
> > final AtomicBoolean shutdown = new AtomicBoolean(false);
> >
> > StreamsBuilder builder = ...
> >
> > KStream stream = builder.stream(...);
> > stream.foreach(new ForeachAction() {
> > int processedMessages = 0;
> > public void apply(K key, V value) {
> > if (++processedMessages > 100) {
> > shutdown.set(true);
> > }
> > }
> > });
> >
> > stream.XXXX // apply regular business logic
> >
> > KafkaStreams streams = ....
> >
> > streams.start();
> >
> > while (!shutdown.get()) {
> > Thread.sleep(100ms);
> > }
> >
> > streams.close();
>
>
>
>
>
> This would terminate after Streams processed 100 messages from one
> partition. You can make it more fancy of course. Using interceptors, you
> should be able do a similar thing.
>
> Hope this helps.
>
>
>
> -Matthias
>
> On 8/7/18 5:18 PM, Kai Jiang wrote:
> > Hi community,
> >
> > Context:
> > We are using Kafka stream to write an application. Generally, we did
> > transformations on messages from one topic to another topic (not using
> > join).
> >
> > In order to sneaking into output results a little bit before flowing it to
> > destination topics, we want to tweak out a debug mode which allows us only
> > consume certain number of messages (~1000) from source topic. And, messages
> > after Kafka stream should be diverged into Stdout or files instead of
> > destination topic. Thus, no message will produce to Kafka and we can get a
> > sense of what result data looks like.
> >
> > Questions:
> > 1. Is that possible to let Kafka stream consume only n messages from source
> > topic and close stream?
> > 2. I think KafkaConsumerInterceptor is an option to count on messages. But,
> > I don't know if there is a way to close Kafka stream when we reach a
> > certain number.
> > 3. Another potential idea I have is do some changes on topology. Like,
> > source node can only read messages in Topic X from offset A to offset B so
> > that we can manually set.
> >
> > I was wondering which approach is feasible or if there are other better
> > solutions. Thanks!
> >
> > Best,
> > Kai
> >
> >
> >
>
> Thank you!