Hello Pradeep, If you just want to close the Streams app based on the first observation of a specific value, you can consider using a shutdown latch, in which the `transformValues()` will decrement and the main thread starting the streams will listen on, and once the latch has been decremented then calling kafkaStreams.close(..) from the main thread.
You can see a concrete example in the simple benchmark code: https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java#L652-L668 Guozhang On Fri, Apr 27, 2018 at 7:15 PM, pradeep s <sreekumar.prad...@gmail.com> wrote: > Hi, > > I am trying to call kafka stream close based on the presence of a value in > the output of ValueTransformer.ValueTransformer produces a > List<TransformerResult> > > Is there a way to avoid the foreach on Kstream and try to get the > first value alone? (like streams api method findFirst) > > private void checkMerchHierarchyEmpty(KStream<byte[], > List<TransformerResult>> trans) { > trans.filter((key, value) -> value.stream().anyMatch(val -> > > MERCH_HIERARCHY_CACHE_EMPTY.equals( > > val.getErrorMessage()))).foreach( > ((key, value) -> { > > metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION, > 1)); > log.fatal("Shutting down kafka stream since merch > hierarchy is empty"); > kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS, > TimeUnit.MILLISECONDS); > }) > ); > > } > > > Thanks > Pradeep > -- -- Guozhang