Hi All, I am trying to simplify how to frame my question so below is my code. I see that BAR gets printed but not FOO and I am not sure why? my batch interval is 1 second (something I pass in when I create a spark context). any idea? I have bunch of events and I want to store the number of events where the status == "Pending" every second (no prior state needed).
jsonMessagesDStream .filter(new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { System.out.println("****************FOO******************"); JsonParser parser = new JsonParser(); JsonObject jsonObj = parser.parse(v1).getAsJsonObject(); if (jsonObj != null && jsonObj.has("status")) { return jsonObj.get("status").getAsString().equalsIgnoreCase("Pending"); } return false; } }).foreachRDD(new VoidFunction<JavaRDD<String>>() { @Override public void call(JavaRDD<String> stringJavaRDD) throws Exception { System.out.println("*****************BAR******************"); store(stringJavaRDD.count()); } });