Hello,

I have a use case where I need to read events(non correlated) from a source
kafka topic, then correlate and push forward to another target topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along
with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I
apply some correlation logic on group events and push forward correlated
events to another topic(via ForEachBatch). Non correlated events are stored
in the state until they are correlated in a future set of events.

With this scenario, when I push a single event to source topic, I see it
comes three times to FlatMapGroupsWithStateFunction(In separate timestamp)
but only once in ForEachBatch processor(which is good).

Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as
it causes issues with my correlation logic.

Can someone help me to understand why this is seen thrice
in FlatMapGroupsWithStateFunction?.

Code snippets are shown below. Please let me know what is missing and how
can i solve this,

thanks,
Robin Kuttaiah

*StreamQuery*






















*    Dataset<MilestoneEvent> sessionUpdates = null;
FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector,
MilestoneEvent> idstateUpdateFunction =         new
FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);    try {
  sessionUpdates = idFedKafkaEvents          .groupByKey(              new
MapFunction<Row, String>() {                private static final long
serialVersionUID = -797571731893988577L;                @Override public
String call(Row event) {                  return
event.getAs("EVENT_MODEL_ID_COL");                }              },
Encoders.STRING())
.flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
          Encoders.bean(IdentifierConnector.class),
Encoders.bean(MilestoneEvent.class),
GroupStateTimeout.ProcessingTimeTimeout());    } catch (Exception
oException) {      //log and throw back exception*








*    }    ForeachBatchProcessor oForeachBatch = new
ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent,
m_strQueryName);    DataStreamWriter<MilestoneEvent> events =
sessionUpdates        .writeStream()        .queryName(queryName)
.outputMode("append")        .trigger(Trigger.ProcessingTime("*5 seconds"
*))*


*        .option("checkpointLocation", checkpointLocation)
.foreachBatch(oForeachBatch);*


*FlatMapGroupsWithStateFunction:*




























*public class FlatMapIdFedGroupFunction implements
FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector,
MilestoneEvent> {  public FlatMapIdFedGroupFunction(InsightEvent iEvent,
InsightDeployment iDeployment) {  }  @Override  public
Iterator<MilestoneEvent> call(String key, Iterator<Row> events,
GroupState<IdentifierConnector> state)      throws Exception {
List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>();
IdentifierConnector session = null;
IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"
 "+System.currentTimeMillis()); //Called thrice    if (!state.exists() ) {
    session = new IdentifierConnector();    }  else {      session =
state.get();    }    while (events.hasNext()) {      Row event =
events.next();      MilestoneEvent mEventCurr =
IdFederationUtil.getMilestoneEvent(event, insightEvent);
outputEvents.add(mEventCurr);
IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called
thrice      break;    }    return outputEvents.iterator();  }*

*}*


*ForEachBatchFunction:*

public class ForeachBatchProcessor implements
VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable {

  private static final long serialVersionUID = 1L;

  public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
      InsightEvent in_oInsightEvent, String in_strQueryName) {
\  }

  public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID)
      throws Exception {
    if (in_Rows.count() == 0L) {
      return;
    }
    IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+
in_Rows.count());
    List<MilestoneEvent> events = in_Rows.collectAsList();
    for(MilestoneEvent m: events) {
      IdFederationUtil.write("......BATCH "+m.getMilestoneId());
    }
  }

}

Reply via email to