*Intro* I am using apache flink to build a rather complex network of data streams. The idea is, to implement a rule engine with flink.
As a basic description of the application, this is how it is supposed to work: Data is received by a kafka consumer source, and processed with a number of data streams, until it is finally sent to a kafka producer sink. The incoming data contains objects with a logical key ("object-id"), and the incoming messages may refer to the same object-id. For every given object-id, the order of its incoming messages must be retained throughout the application. The order of overall messages can be arbitrary. This means, messages a,b and c of object1 must be processed in-order, however message x of object2 might be processed in between a1/b1/c1, before, or after, it does not matter. For my current understanding this means I must keyBy(_.objectID), so that messages of the same object are processed in the order they arrived. *Current approach* To implement the actual rule engine, a network of streams is created. The idea is the following: - each rule will have 1-n conditions - for every condition of every rule create a sub-stream of the original stream with .filter(_.matches(rule.condition)) - combine all sub-streams which correspond to the same rule by using substream1.connect(substream2).flatMap(new CombineFunction[WorkingMemory](...)) - connect can only join 2 streams, so a rule with 3 conditions will result in subsequent 2 joins - rules using the same condition will re-use the same sub-stream created in the second step. This will result in n joined streams, where n corresponds to the number of rules. The joined streams will have a map function appended to them, which marks the message, so that we know that a rule matched. Each joined/result stream may publish its result ("rule xyz matched") to the kafka producer independently from the other results, so at this point I can attach the sink to the streams. *Connect details* Because the .connect of two streams ("condition"-substreams) must only pass a message, if it was received on both streams (^= both conditions matched), I need a RichCoFlatMapFunction with a keyed state, which can take care of the "pass only if it was received already on the other side". However, the problem is, that the stream is keyed by object-id. So what happens if 2 messages of the same object run through the network and reach the .connect().map(new RichCoFlatMapFunction...)? It will lead to wrong ouput. I would need to assign each incoming message a unique ID (UUID) upon entering the network, so I can use this key (instead of the object-id) in the .connect().map().. join. But at the same time, I need the stream to be keyed by object-id, so that messages of the same objects are processed in-order. What to do? To solve this, I kept the input-stream with keyBy(_.objectID), but the RichCoFlatMapFunction in the stream-join no longer uses the keyed-state. Instead, I am using a simple operator state, which keeps a map of passed objects, but implements the same logic, just with manual key/value lookup. This seems to work, however I don't know if this introduces more issues. *Visualization* The flink GUI will render this image, for a list of 14 rules with a total of 23 conditions (some rules only have one condition): https://i.stack.imgur.com/fudsq.png https://i.stack.imgur.com/8dHJw.png *Code* The creation of the network is achieved using this code: ``` val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]() val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]() if (rules.isEmpty) return // create partial streams for all conditions (first level) // cache the sub-stream with the hashcode of its condition as key (for re-use) for (rule <- rules if rule.checks.nonEmpty ; cond <- rule.checks if !streamCache.contains(cond.hashCode())) streamCache += cond.hashCode -> sourceStream.filter(cond.matches _) // create joined streams for combined conditions (sub-levels) for (rule <- rules if rule.checks.nonEmpty) { val ruleName = rule.ruleID // for each rule, starting with the rule with the least conditions ... if (rule.checks.size == 1) { // ... create exit node if single-condition rule // each exit node applies the rule-name to the objects set of matched rules. outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj }) } else { // ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths) var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode) var idString = rule.checks.head.idString for (i <- rule.checks.indices) { if (i == rule.checks.size-1) { // reached last condition of rule, create exit-node // each exit node applies the rule-name to the objects set of matched rules. val rn = ruleName val objectType = rule.objectType.mkString(":") val statement = rule.statement outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj }) } else { // intermediate condition, create normal intermediate node val there = rule.checks(i+1) val connectStream = streamCache(there.hashCode) idString += (":" + there.idString) // try to re-use existing tree-segments if (streamCache.contains(idString.hashCode)) sourceStream = streamCache(idString.hashCode) else sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString)) } } } } // connect each output-node to the sink for (stream <- outputNodesCache) { stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink) } ``` The StatefulCombineFunction used in the previous snippet: ``` class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction { @transient private var leftState:ListState[(String, WorkingMemory)] = _ private var rightState:ListState[(String, WorkingMemory)] = _ private var bufferedLeft = ListBuffer[(String, WorkingMemory)]() private var bufferedRight = ListBuffer[(String, WorkingMemory)]() override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left") override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right") def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit = { val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid) if (otherIdx > -1) { out.collect(leftState(otherIdx)._2) leftState.remove(otherIdx) } else { rightState += ((xmlObject.uuid, xmlObject)) } } override def initializeState(context:FunctionInitializationContext): Unit = ??? override def snapshotState(context:FunctionSnapshotContext):Unit = ??? } ``` *Background information* This application shall implement the rete-algorithm for rule matching using flink (https://en.wikipedia.org/wiki/Rete_algorithm). A different approach would be to just loop all rules for every incoming message, and attach the result. I have a working implementation for this approach using flink, so please don't advise this as a solution. *Issues* The problem is, that the application messes up the order of incoming messages on the object-id level. That is, it does not achieve what I required in the intro. For each object-id, the incoming messages must keep the order. But this is not the case. I don't know at which point in code the order gets messed up, or how those operations are distributed amongst threads, so I don't know how to solve this issue. Best regards Patrick Fial