In my Spark Streaming application I am reading data from certain Kafka topic.
While reading from topic whenever I encounter certain message (for example:
"poison") I want to stop the streaming. Currently I am achieving this using
following code:  jsc is instance of JavaStreamingContext and directStream is
instance of JavaPairInputDStream./LongAccumulator poisonNotifier =
sc.longAccumulator("poisonNotifier");directStream.foreachRDD(rdd -> {           
RDD rows = rdd.values().map(value -> {                              if
(value.equals("poison") {                    poisonNotifier.add(1);             
  
} else {                    ...                 }                return row;    
       
}).rdd();        });jsc.start();ExecutorService poisonMonitor =
Executors.newSingleThreadExecutor();poisonMonitor.execute(() -> {    while
(true) {        if (poisonNotifier.value() > 0) {            jsc.stop(false,
true);            break;        }    }});try {    jsc.awaitTermination();}
catch (InterruptedException e) {   
e.printStackTrace();}poisonMonitor.shutdown();/Although this approach is
working, this doesn't sounds like right approach to me. Is there any other
better(cleaner) way to achieve the same?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to