Re: KStreams API Usage

2018-05-02 Thread Guozhang Wang
Hello Pradeep,

If you just want to close the Streams app based on the first observation of
a specific value, you can consider using a shutdown latch, in which the
`transformValues()` will decrement and the main thread starting the streams
will listen on, and once the latch has been decremented then calling
kafkaStreams.close(..)
from the main thread.

You can see a concrete example in the simple benchmark code:
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java#L652-L668


Guozhang


On Fri, Apr 27, 2018 at 7:15 PM, pradeep s 
wrote:

> Hi,
>
> I am trying to call kafka stream close based on the presence of a value in
> the output of ValueTransformer.ValueTransformer produces a
> List
>
> Is there a way to avoid the foreach on Kstream and try to get the
> first value alone? (like streams api method findFirst)
>
>  private void checkMerchHierarchyEmpty(KStream List> trans) {
> trans.filter((key, value) -> value.stream().anyMatch(val ->
>
> MERCH_HIERARCHY_CACHE_EMPTY.equals(
>
>   val.getErrorMessage(.foreach(
> ((key, value) -> {
>
> metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION,
> 1));
> log.fatal("Shutting down kafka stream since merch
> hierarchy is empty");
> kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS,
> TimeUnit.MILLISECONDS);
> })
> );
>
> }
>
>
> Thanks
> Pradeep
>



-- 
-- Guozhang


KStreams API Usage

2018-04-27 Thread pradeep s
Hi,

I am trying to call kafka stream close based on the presence of a value in
the output of ValueTransformer.ValueTransformer produces a
List

Is there a way to avoid the foreach on Kstream and try to get the
first value alone? (like streams api method findFirst)

 private void checkMerchHierarchyEmpty(KStream trans) {
trans.filter((key, value) -> value.stream().anyMatch(val ->

MERCH_HIERARCHY_CACHE_EMPTY.equals(

  val.getErrorMessage(.foreach(
((key, value) -> {

metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION,
1));
log.fatal("Shutting down kafka stream since merch
hierarchy is empty");
kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS,
TimeUnit.MILLISECONDS);
})
);

}


Thanks
Pradeep