Hi Mathieu, join semantics are tricky. We are still working on a better documentation for it...
For the current state and your question: Each time a record is processed, it looks up the other KTable to see if there is a matching record. If non is found, the join result is empty and a tombstone record with <key:null> is sent downstream. This happens, to delete any (possible existing) previous join result for this key -- keep in mind, that the result is a KTable containing the current state of the join. This happens both ways, thus, if your first records of each stream do not match on the key, both result in a <key:null> message to delete possible existing join-tuples in the result KTable. Does this make sense to you? -Matthias On 07/20/2016 04:09 PM, Mathieu Fenniak wrote: > Hello Kafka users, > > I'm seeing some unexpected results when using Kafka Streams, and I was > hoping someone could explain them to me. I have two streams, which I've > converted KStream->KTable, and then I am joining them together with a > "join" (not an outer join, not a full join). With the resulting KTable > from the join, I am performing a foreach. > > When I startup my Streams application, my foreach receives two records with > valid keys but null values *before* my ValueJoiner ever gets executed. Why > would that be? > > Code excerpt; please excuse the use of Kotlin here: > > val builder = KStreamBuilder() > > val approvalStatus = builder.table( > Serdes.String(), > JsonSerde(TimesheetApprovalStatusChangedMessage::class.java), > "TimesheetApprovalStatusChanged" > ) > > val timesheetLastApprovalAction = builder.table( > Serdes.String(), > JsonSerde(Map::class.java), > "TimesheetApprovalActionPerformed" > ) > > val timesheetStatus = approvalStatus.join(timesheetLastApprovalAction, { > approvalStatus, lastApprovalAction -> > println("EXECUTING ValueJoiner") > computeTimesheetStatus(approvalStatus.approvalStatus!!, lastApprovalAction > as Map<String, Any?>) > }) > > timesheetStatus.foreach({ timesheetKey, timesheetStatus -> > println("EXECUTING ForeachAction: $timesheetKey, status: > $timesheetStatus") > if (timesheetStatus == null) { > println("SKIPPING NULL I DON'T UNDERSTAND") > } > }) > > > Resulting console output: > > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > status: null > SKIPPING NULL I DON'T UNDERSTAND > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > status: null > SKIPPING NULL I DON'T UNDERSTAND > EXECUTING ValueJoiner > EXECUTING ForeachAction: mfenniak/f2abd0e8-a5f9-4560-8c55-dbc9d754b982, > status: urn:replicon:timesheet-status:submitting > > > Any explanation on why the foreach would be executing for data that hasn't > been generated by my join? > > Thanks, > > Mathieu >
signature.asc
Description: OpenPGP digital signature