Hi,
Is using the session window to implement the above logic is good idea or i
should use process function.

On Sun, Mar 1, 2020 at 11:39 AM aj <ajainje...@gmail.com> wrote:

> Hi ,
>
> I am working on a use case where i have a stream of events. I want to
> attach a unique id to all the events happened in a session.
> Below is the logis that i am trying to implement. -
>
> 1. session_started
> 2 whenevr a event_name=search generate a unique search_id and attch this
> id to all the following events in session until a new "search" event
> encountered in session.
>
> Example :
> *user-1.  session-1   event_name- search (generate searchid --1)*
> user-1.  session-1   event_name- x  (attach above search id -1)
> user-1.  session-1   event_name- y (attach above search id -1)
> user-1.  session-1   event_name- y (attach above search id -1)
> *user-1.  session-1   event_name- search (generate searchid --2)*
> user-1.  session-1   event_name- x  (attach above search id -2)
> user-1.  session-1   event_name- y (attach above search id -2)
> user-1.  session-1   event_name- y (attach above search id -2)
>
> As events can come out of order so i want to do this after session window
> got over. So after session window i am doing like this :
>
> 1. sort all the events by time.
> 2. iterate ech event and attach the search_id
> 3. collect all th events and generate another stream with enrich search_id.
>
> I am trying with below code but its not working as expected . i am not
> able to understand what is happening.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *dataStream.keyBy((KeySelector<GenericRecord, String>) record -> {
>         StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id"));                return
> builder.toString();
> }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
>           .process(new ProcessWindowFunction<GenericRecord, GenericRecord,
> String, TimeWindow>() {                        @Override
>     public void process(String key, Context context,
> Iterable<GenericRecord> iterable, Collector<GenericRecord> collector)
> throws Exception {                            Stream<GenericRecord> result
> = IterableUtils.toStream(iterable);
> List<GenericRecord> s = result.collect(Collectors.toList());
>             Map<Long,GenericRecord> recordMap = new HashMap<>();
>                 for(GenericRecord record : s) {
>         recordMap.put((long)record.get("event_ts"),record);
>             }                            Map<Long,GenericRecord>
> sortedRecordMap = new LinkedHashMap<>();
> recordMap.entrySet().stream()
> .sorted(Map.Entry.comparingByKey())
> .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
>                     String search_id = null;
> for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) {
>                             GenericRecord record = element.getValue();
>                           if(record.get("event_name").equals("search")) {
>                                   search_id =
> UUID.randomUUID().toString();                                }
>                   record.put("search_id",search_id);
>         collector.collect(record);                            }
>             }                    }).print();*
>
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to