Hi,
I am trying to call kafka stream close based on the presence of a value in
the output of ValueTransformer.ValueTransformer produces a
List<TransformerResult>
Is there a way to avoid the foreach on Kstream and try to get the
first value alone? (like streams api method findFirst)
private void checkMerchHierarchyEmpty(KStream<byte[],
List<TransformerResult>> trans) {
trans.filter((key, value) -> value.stream().anyMatch(val ->
MERCH_HIERARCHY_CACHE_EMPTY.equals(
val.getErrorMessage()))).foreach(
((key, value) -> {
metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION,
1));
log.fatal("Shutting down kafka stream since merch
hierarchy is empty");
kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS,
TimeUnit.MILLISECONDS);
})
);
}
Thanks
Pradeep