Hi Marco,

I noticed your window is 1 second width, not 1 minute width. Is that 
intentional?

Thanks
Eno
> On 17 Apr 2017, at 19:41, Marco Abitabile <marco.abitab...@gmail.com> wrote:
> 
> hello Eno,
> thanks for your support. The two streams are both kstreams. The window is of 
> 1 minute-width until 5 minutes. This is the code:
> 
> //Other Stream: User Location, is a string with the name of the city the
> //user is (like "San Francisco")
> KStreamBuilder builder = new KStreamBuilder();
> KStream<String, String> userLocationStream = locationStreamBuilder
>     .stream(stringSerde, stringSerde,"userLocationStreamData");
> KStream<String, String> locationKstream = userLocationStream
>     .map(MyStreamUtils::enhanceWithAreaDetails);
> locationKstream.to("user_location");
> //This Stream: User Activity
> KStream<String, JsonObject> activity = builder.stream(stringSerde, jsonSerde, 
> "activityStreamData");
> activity.filter(MyStreamUtils::filterOutFakeUsers)
>     .map(MyStreamUtils::enhanceWithScoreDetails)
>     .join(
>         locationKstream,
>         MyStreamUtils::locationActivityJoiner,
>         JoinWindows.of(1000).until(1000 * 60 * 5),
>         stringSerde, jsonSerde, stringSerde)
>     .to("usersWithLocation")
> 
> KafkaStreams stream = new KafkaStreams(builder, propsActivity);
> stream.start();
> 
> 
> And MyStreamUtils::locationActivityJoiner does:
> 
> public static JsonObject locationActivityJoiner(JsonObject activity, String
> loc) {
>     JsonObject join = activity.copy();
>     join.put("city" , loc);
>     return join;
> }
> 
> hum... your question is letting me think... are you telling me that since 
> both are kstreams, they actually need to be re-streamed in sync?
> 
> Thanks a lot.
> 
> Marco
> 
> 
> 2017-04-16 21:45 GMT+02:00 Eno Thereska <eno.there...@gmail.com 
> <mailto:eno.there...@gmail.com>>:
> Hi Marco,
> 
> Could you share a bit of your code, or at a minimum provide some info on:
> - is userActivitiesStream and geoDataStream a KStream of KTable?
> - what is the length of "timewindow"?
> 
> Thanks
> Eno
> 
> > On 16 Apr 2017, at 19:44, Marco Abitabile <marco.abitab...@gmail.com 
> > <mailto:marco.abitab...@gmail.com>> wrote:
> >
> > Hi All!
> >
> > I need a little hint to understand how join works, in regards of stream
> > synchronization.
> >
> > This mail is a bit long, I need to explain the issue I'm facing.
> >
> > *TL-TR: *
> > it seems that join synchonization between stream is not respected as
> > explained here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization
> >  
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-StreamSynchronization>
> >
> > *The need:*
> > I have historical data residing into some databases, more specifically:
> >  - time series of user activities
> >  - time series of user geo positions
> >
> > *What I do:*
> > since I have a new algorithm I want to try, the historical data has been
> > already pruned by kafka retention policy and I have it into a database.
> > This is what I'm doing:
> >  1- spin up kafka-connect sink that takes historical gps data (let's say,
> > one day of data), ordered by event time, and push them into
> > "HistoricalGpsData" topic. This tasks pushes historical geo data as fast as
> > possible into kafka topic, respecting the original event time.
> >  2- spin up kafka-connect sink that takes historical user activities
> > (let's say, one day of data, the same day of gps data, of course), ordered
> > by event time, and push them into "HistoricalUserActivites" topic. This
> > tasks pushes historical user activities data as fast as possible into kafka
> > topic, respecting the original event time.
> >  3- spin up my new stream processor algorithm
> >
> > As per the nature of the data, I have the quantity of activity data much
> > higher than geo data, thus the task1 pushes all the needed geo data into
> > kafka topic within few minutes (around 10 minutes), while activities data,
> > since has a higher volume, is entirely pushed within 1 hour.
> > --> the two streams are pushed into kafka regardless of their
> > synchronization (however being aware of their nature, as explained above)
> >
> > *What I expect:*
> > Now, what I would expect is that when I perform the join between the two
> > stream:
> >
> >   userActivitiesStream.join(geoDataStrea, timewindow...)
> >
> > the join takes the incoming user activities data and joins with the geo
> > data respecting the given time window.
> > As per the nature of the data, there is always a match (within the given
> > timeWindow) between user activities data with geo data (in fact, when this
> > data arrives in real time, there are no issues at all)
> >
> > So. I expect that the join picks up from the topic the right geo data
> > (recall that geo data is pushed into the topic within 10 minutes) and joins
> > it with the user activities data (recall that user activities data is a
> > stream that takes around 1 hour)
> >
> > *What I get:*
> > What happens is that only the first few minutes of user data is actually
> > processed by the join, after that user data comes is and the joins doesn't
> > join any data anymore.
> >
> > It seems that the join doesn't respect the time semantics (configured to be
> > the default straregy: event data) unless the two streams are synchronized
> > (actually, this happens the first minutes, when I start the whole
> > reprocessing tasks).
> >
> >
> > Can you help me to provide the right clue? Do I have to push the tho
> > streams in a sychronized fashion (such as simulating real time data flow,
> > as they came the first time into the system)?
> >
> > Thanks for your support.
> >
> > Best
> > Marco
> 
> 

Reply via email to