Hi, Is using the session window to implement the above logic is good idea or i should use process function.
On Sun, Mar 1, 2020 at 11:39 AM aj <ajainje...@gmail.com> wrote: > Hi , > > I am working on a use case where i have a stream of events. I want to > attach a unique id to all the events happened in a session. > Below is the logis that i am trying to implement. - > > 1. session_started > 2 whenevr a event_name=search generate a unique search_id and attch this > id to all the following events in session until a new "search" event > encountered in session. > > Example : > *user-1. session-1 event_name- search (generate searchid --1)* > user-1. session-1 event_name- x (attach above search id -1) > user-1. session-1 event_name- y (attach above search id -1) > user-1. session-1 event_name- y (attach above search id -1) > *user-1. session-1 event_name- search (generate searchid --2)* > user-1. session-1 event_name- x (attach above search id -2) > user-1. session-1 event_name- y (attach above search id -2) > user-1. session-1 event_name- y (attach above search id -2) > > As events can come out of order so i want to do this after session window > got over. So after session window i am doing like this : > > 1. sort all the events by time. > 2. iterate ech event and attach the search_id > 3. collect all th events and generate another stream with enrich search_id. > > I am trying with below code but its not working as expected . i am not > able to understand what is happening. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *dataStream.keyBy((KeySelector<GenericRecord, String>) record -> { > StringBuilder builder = new StringBuilder(); > builder.append(record.get("session_id")); > builder.append(record.get("user_id")); return > builder.toString(); > }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) > .process(new ProcessWindowFunction<GenericRecord, GenericRecord, > String, TimeWindow>() { @Override > public void process(String key, Context context, > Iterable<GenericRecord> iterable, Collector<GenericRecord> collector) > throws Exception { Stream<GenericRecord> result > = IterableUtils.toStream(iterable); > List<GenericRecord> s = result.collect(Collectors.toList()); > Map<Long,GenericRecord> recordMap = new HashMap<>(); > for(GenericRecord record : s) { > recordMap.put((long)record.get("event_ts"),record); > } Map<Long,GenericRecord> > sortedRecordMap = new LinkedHashMap<>(); > recordMap.entrySet().stream() > .sorted(Map.Entry.comparingByKey()) > .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue())); > String search_id = null; > for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) { > GenericRecord record = element.getValue(); > if(record.get("event_name").equals("search")) { > search_id = > UUID.randomUUID().toString(); } > record.put("search_id",search_id); > collector.collect(record); } > } }).print();* > > > -- > Thanks & Regards, > Anuj Jain > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>