Hi,

I am trying to call kafka stream close based on the presence of a value in
the output of ValueTransformer.ValueTransformer produces a
List<TransformerResult>

Is there a way to avoid the foreach on Kstream and try to get the
first value alone? (like streams api method findFirst)

 private void checkMerchHierarchyEmpty(KStream<byte[],
List<TransformerResult>> trans) {
    trans.filter((key, value) -> value.stream().anyMatch(val ->

MERCH_HIERARCHY_CACHE_EMPTY.equals(

  val.getErrorMessage()))).foreach(
            ((key, value) -> {

metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION,
1));
                log.fatal("Shutting down kafka stream since merch
hierarchy is empty");
                kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS,
TimeUnit.MILLISECONDS);
            })
    );

}


Thanks
Pradeep

Reply via email to