Hello, I have a use case where I need to read events(non correlated) from a source kafka topic, then correlate and push forward to another target topic.
I use spark structured streaming with FlatMapGroupsWithStateFunction along with GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I apply some correlation logic on group events and push forward correlated events to another topic(via ForEachBatch). Non correlated events are stored in the state until they are correlated in a future set of events. With this scenario, when I push a single event to source topic, I see it comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) but only once in ForEachBatch processor(which is good). Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as it causes issues with my correlation logic. Can someone help me to understand why this is seen thrice in FlatMapGroupsWithStateFunction?. Code snippets are shown below. Please let me know what is missing and how can i solve this, thanks, Robin Kuttaiah *StreamQuery* * Dataset<MilestoneEvent> sessionUpdates = null; FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> idstateUpdateFunction = new FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment); try { sessionUpdates = idFedKafkaEvents .groupByKey( new MapFunction<Row, String>() { private static final long serialVersionUID = -797571731893988577L; @Override public String call(Row event) { return event.getAs("EVENT_MODEL_ID_COL"); } }, Encoders.STRING()) .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), Encoders.bean(IdentifierConnector.class), Encoders.bean(MilestoneEvent.class), GroupStateTimeout.ProcessingTimeTimeout()); } catch (Exception oException) { //log and throw back exception* * } ForeachBatchProcessor oForeachBatch = new ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, m_strQueryName); DataStreamWriter<MilestoneEvent> events = sessionUpdates .writeStream() .queryName(queryName) .outputMode("append") .trigger(Trigger.ProcessingTime("*5 seconds" *))* * .option("checkpointLocation", checkpointLocation) .foreachBatch(oForeachBatch);* *FlatMapGroupsWithStateFunction:* *public class FlatMapIdFedGroupFunction implements FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> { public FlatMapIdFedGroupFunction(InsightEvent iEvent, InsightDeployment iDeployment) { } @Override public Iterator<MilestoneEvent> call(String key, Iterator<Row> events, GroupState<IdentifierConnector> state) throws Exception { List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>(); IdentifierConnector session = null; IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+" "+System.currentTimeMillis()); //Called thrice if (!state.exists() ) { session = new IdentifierConnector(); } else { session = state.get(); } while (events.hasNext()) { Row event = events.next(); MilestoneEvent mEventCurr = IdFederationUtil.getMilestoneEvent(event, insightEvent); outputEvents.add(mEventCurr); IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called thrice break; } return outputEvents.iterator(); }* *}* *ForEachBatchFunction:* public class ForeachBatchProcessor implements VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable { private static final long serialVersionUID = 1L; public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment, InsightEvent in_oInsightEvent, String in_strQueryName) { \ } public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID) throws Exception { if (in_Rows.count() == 0L) { return; } IdFederationUtil.write("Processing batch " + in_lBatchID + " "+ in_Rows.count()); List<MilestoneEvent> events = in_Rows.collectAsList(); for(MilestoneEvent m: events) { IdFederationUtil.write("......BATCH "+m.getMilestoneId()); } } }