Hi, Could you please provide the Spark version?
Also it would be pretty much helpful if you could provide a simple reproducer, like placing your reproducer which can simply be built (mvn or gradle or sbt) into your Github repository, plus the set of input data to see the behavior. Worth to know that others aren't interested in your own code even if they are interested in the problematic behavior itself. It'd be nice if you can minimize the hurdle on debugging. Thanks, Jungtaek Lim (HeartSaVioR) On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin <kutta...@gmail.com> wrote: > Hello, > > I have a use case where I need to read events(non correlated) from a > source kafka topic, then correlate and push forward to another target topic. > > I use spark structured streaming with FlatMapGroupsWithStateFunction along > with GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I > apply some correlation logic on group events and push forward correlated > events to another topic(via ForEachBatch). Non correlated events are stored > in the state until they are correlated in a future set of events. > > With this scenario, when I push a single event to source topic, I see it > comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) > but only once in ForEachBatch processor(which is good). > > Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as > it causes issues with my correlation logic. > > Can someone help me to understand why this is seen thrice > in FlatMapGroupsWithStateFunction?. > > Code snippets are shown below. Please let me know what is missing and how > can i solve this, > > thanks, > Robin Kuttaiah > > *StreamQuery* > > > > > > > > > > > > > > > > > > > > > > > * Dataset<MilestoneEvent> sessionUpdates = null; > FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, > MilestoneEvent> idstateUpdateFunction = new > FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment); try { > sessionUpdates = idFedKafkaEvents .groupByKey( new > MapFunction<Row, String>() { private static final long > serialVersionUID = -797571731893988577L; @Override public > String call(Row event) { return > event.getAs("EVENT_MODEL_ID_COL"); } }, > Encoders.STRING()) > .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), > Encoders.bean(IdentifierConnector.class), > Encoders.bean(MilestoneEvent.class), > GroupStateTimeout.ProcessingTimeTimeout()); } catch (Exception > oException) { //log and throw back exception* > > > > > > > > > * } ForeachBatchProcessor oForeachBatch = new > ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, > m_strQueryName); DataStreamWriter<MilestoneEvent> events = > sessionUpdates .writeStream() .queryName(queryName) > .outputMode("append") .trigger(Trigger.ProcessingTime("*5 seconds" > *))* > > > * .option("checkpointLocation", checkpointLocation) > .foreachBatch(oForeachBatch);* > > > *FlatMapGroupsWithStateFunction:* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *public class FlatMapIdFedGroupFunction implements > FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, > MilestoneEvent> { public FlatMapIdFedGroupFunction(InsightEvent iEvent, > InsightDeployment iDeployment) { } @Override public > Iterator<MilestoneEvent> call(String key, Iterator<Row> events, > GroupState<IdentifierConnector> state) throws Exception { > List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>(); > IdentifierConnector session = null; > IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+" > "+System.currentTimeMillis()); //Called thrice if (!state.exists() ) { > session = new IdentifierConnector(); } else { session = > state.get(); } while (events.hasNext()) { Row event = > events.next(); MilestoneEvent mEventCurr = > IdFederationUtil.getMilestoneEvent(event, insightEvent); > outputEvents.add(mEventCurr); > IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called > thrice break; } return outputEvents.iterator(); }* > > *}* > > > *ForEachBatchFunction:* > > public class ForeachBatchProcessor implements > VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable { > > private static final long serialVersionUID = 1L; > > public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment, > InsightEvent in_oInsightEvent, String in_strQueryName) { > \ } > > public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID) > throws Exception { > if (in_Rows.count() == 0L) { > return; > } > IdFederationUtil.write("Processing batch " + in_lBatchID + " "+ > in_Rows.count()); > List<MilestoneEvent> events = in_Rows.collectAsList(); > for(MilestoneEvent m: events) { > IdFederationUtil.write("......BATCH "+m.getMilestoneId()); > } > } > > } > > >