Hi Jungtaek, Thanks for looking into it. We use spark-2.4.3. I removed most of our code and pasted here just to understand the flow. Sorry for the delay. I would try to provide a simple reproducer when I find time, but this is really hurting us.
Another observation I see is basically only if I add some value into *outputEvents. List in **FlatMapIdFedGroupFunction I see this is called multiple times otherwise it's only once.* *By any chance you can provide me on how to debug this and any guesses what could be wrong so that I can focus on debugging on the right path.* *thanks* *Robin Kuttaiah* On Fri, Mar 12, 2021 at 8:43 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote: > Hi, > > Could you please provide the Spark version? > > Also it would be pretty much helpful if you could provide a simple > reproducer, like placing your reproducer which can simply be built (mvn or > gradle or sbt) into your Github repository, plus the set of input data to > see the behavior. Worth to know that others aren't interested in your own > code even if they are interested in the problematic behavior itself. It'd > be nice if you can minimize the hurdle on debugging. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin <kutta...@gmail.com> wrote: > >> Hello, >> >> I have a use case where I need to read events(non correlated) from a >> source kafka topic, then correlate and push forward to another target topic. >> >> I use spark structured streaming with FlatMapGroupsWithStateFunction >> along with GroupStateTimeout.ProcessingTimeTimeout() . After each >> timeout, I apply some correlation logic on group events and push forward >> correlated events to another topic(via ForEachBatch). Non correlated events >> are stored in the state until they are correlated in a future set of events. >> >> With this scenario, when I push a single event to source topic, I see it >> comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) >> but only once in ForEachBatch processor(which is good). >> >> Same event coming thrice in FlatMapGroupsWithStateFunction is a problem >> as it causes issues with my correlation logic. >> >> Can someone help me to understand why this is seen thrice >> in FlatMapGroupsWithStateFunction?. >> >> Code snippets are shown below. Please let me know what is missing and how >> can i solve this, >> >> thanks, >> Robin Kuttaiah >> >> *StreamQuery* >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> * Dataset<MilestoneEvent> sessionUpdates = null; >> FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, >> MilestoneEvent> idstateUpdateFunction = new >> FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment); try { >> sessionUpdates = idFedKafkaEvents .groupByKey( new >> MapFunction<Row, String>() { private static final long >> serialVersionUID = -797571731893988577L; @Override public >> String call(Row event) { return >> event.getAs("EVENT_MODEL_ID_COL"); } }, >> Encoders.STRING()) >> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), >> Encoders.bean(IdentifierConnector.class), >> Encoders.bean(MilestoneEvent.class), >> GroupStateTimeout.ProcessingTimeTimeout()); } catch (Exception >> oException) { //log and throw back exception* >> >> >> >> >> >> >> >> >> * } ForeachBatchProcessor oForeachBatch = new >> ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, >> m_strQueryName); DataStreamWriter<MilestoneEvent> events = >> sessionUpdates .writeStream() .queryName(queryName) >> .outputMode("append") .trigger(Trigger.ProcessingTime("*5 seconds" >> *))* >> >> >> * .option("checkpointLocation", checkpointLocation) >> .foreachBatch(oForeachBatch);* >> >> >> *FlatMapGroupsWithStateFunction:* >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *public class FlatMapIdFedGroupFunction implements >> FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, >> MilestoneEvent> { public FlatMapIdFedGroupFunction(InsightEvent iEvent, >> InsightDeployment iDeployment) { } @Override public >> Iterator<MilestoneEvent> call(String key, Iterator<Row> events, >> GroupState<IdentifierConnector> state) throws Exception { >> List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>(); >> IdentifierConnector session = null; >> IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+" >> "+System.currentTimeMillis()); //Called thrice if (!state.exists() ) { >> session = new IdentifierConnector(); } else { session = >> state.get(); } while (events.hasNext()) { Row event = >> events.next(); MilestoneEvent mEventCurr = >> IdFederationUtil.getMilestoneEvent(event, insightEvent); >> outputEvents.add(mEventCurr); >> IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called >> thrice break; } return outputEvents.iterator(); }* >> >> *}* >> >> >> *ForEachBatchFunction:* >> >> public class ForeachBatchProcessor implements >> VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable { >> >> private static final long serialVersionUID = 1L; >> >> public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment, >> InsightEvent in_oInsightEvent, String in_strQueryName) { >> \ } >> >> public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID) >> throws Exception { >> if (in_Rows.count() == 0L) { >> return; >> } >> IdFederationUtil.write("Processing batch " + in_lBatchID + " "+ >> in_Rows.count()); >> List<MilestoneEvent> events = in_Rows.collectAsList(); >> for(MilestoneEvent m: events) { >> IdFederationUtil.write("......BATCH "+m.getMilestoneId()); >> } >> } >> >> } >> >> >>