Hi Brian,

Sorry for you headache. We are aware that current join semantics in
Streams are not straight forward.

We did rework those already in trunk and this change will be included in
next release 0.10.2. Please build from trunk and let us know if this
resolves your issue.

For details, see this wiki page explaining current and new join semantics:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

For more details see the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-77%3A+Improve+Kafka+Streams+Join+Semantics


Long story short: leftJoin only triggers a join computation for the left
input while records of the right input only update the (right) input
KTable but do not compute a join result. Thus, if your right input data
arrives before you left input data it works -- however, if you left
input data arrives first, it will not enrich the stream but join with
"null".


-Matthias

On 12/7/16 9:25 AM, Brian Krahmer wrote:
> Hey guys,
> 
>   I'm having a hell of a time here.  I've worked for days trying to get
> this joining pipeline working.  I thought I had it working last week,
> but my jubilation was premature.  The point was to take data in from
> five different topics and merge them together to obtain one enriched
> event (output to compacted topic).  Can anybody spot what I'm doing
> wrong?  The ordering makes no difference.  For example, I've switched
> the locationInput and the vehicleReservedInput inputs in the leftJoin
> calls below, and I get the same results.  The location part of the
> enrichment works while the vehicleReserved part does not.  I can't even
> think of how to restructure the topology without resorting to building
> my own lower-level topology.
> 
> thanks,
> brian
> 
> 
> KTable<String, VehicleFinderData> fleetInput =
> builder.table(Serdes.String(),
>                 vehicleFinderDataSerde, FLEET_TOPIC,
> VEHICLE_ENRICHER_FLEET_STORE);
> ...
> fleetInput.print("fleetInput");
> locationInput.print("locationInput");
> vehicleReservedInput.print("vehicleReservedInput");
> vehicleReleasedInput.print("vehicleReleasedInput");
> vehicleUsageEndedInput.print("vehicleUsageEndedInput");
> 
> KTable<String, VehicleFinderData> mergeStepOne =
> fleetInput.leftJoin(locationInput, VehicleFinderData::merge);
> mergeStepOne.print("mergeStepOne");
> KTable<String, VehicleFinderData> mergeStepTwo =
> mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);
> mergeStepTwo.print("mergeStepTwo");
> KTable<String, VehicleFinderData> mergeStepThree =
> mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);
> mergeStepThree.print("mergeStepThree");
> KTable<String, VehicleFinderData> mergeStepFour =
> mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);
> mergeStepFour.print("mergeStepFour");
> 
> ** Generate a location event **
> 
> [locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Deserializing from topic VehicleEnricherFleetStore
> Merge operation called
> [mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Merge operation called
> [mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Merge operation called
> [mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> Merge operation called
> [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
> value}<-null)
> 
> ** New event correctly serialized **
> 
> -------------------------------------------------------
> 
> ** Generate a vehicleReserved event **
> 
> [vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped
> json value}<-null)
> [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)
> 
> ** NO EVENT **
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to