Hi Henri, #1 - This is by design. Event time advances with the slowest input source. If there are input sources that generate no data this is indistinguishable from a slow source. Kafka topics where some partitions receive no data are a problem in this regard -- but there isn't a simple solution. If possible I would address it at the source.
#2 - If it's possible to run these init functions just once when you submit the job you can run them in the constructor of your FoldFunction. This init will then happen exactly once (on the client) and the constructed FoldFunction is then serialized and distributed around the cluster. If this doesn't work because you need something truly dynamic you could also accomplish this with a simple local variable in your function. class MyFoldFunction extends FoldFunction { > private var initialized = false > def fold(accumulator: T, value: O): T = { > if(!initialized){ > doInitStuff() > initialized = true > } > > doNormalStuff() > } > } #3 - One way to do this is as you've said which is to attach the profile information to the event, using a mapper, before it enters the window operations. On Mon, Jan 2, 2017 at 1:25 AM, Henri Heiskanen <henri.heiska...@gmail.com> wrote: > Hi, > > I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and > what I would like to accomplish is to have a stream that reads data from > multiple kafka topics, identifies user sessions, uses an external user user > profile to enrich the data, evaluates an script to produce session > aggregates and then create updated profiles from session aggregates. I am > working with high volume data and user sessions may be long, so using > generic window apply might not work. Below is the simplification of the > stream. > > stream = createKafkaStreams(...); > env.setParallelism(4); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > stream > .keyBy(2) > .window(EventTimeSessionWindows.withGap(Time.minutes(10))) > .fold(new SessionData(), new SessionFold(), new > ProfilerApply()) > .print(); > > The questions: > > 1. Initially when I used event time windowing I could not get any of my > windows to close. The reason seemed to be that I had 6 partitions in my > test kafka setup and only 4 of them generated traffic. If I used > parallelism above 4, then no windows were closed. Is this by design or a > defect? We use flink-connector-kafka-0.10 because earlier versions did not > commit the offsets correctly. > > 2. Rich fold functions are not supported. However I would like execute a > piece of custom script in the fold function that requires initialisation > part. I would have used the open and close lifecycle methods of rich > functions but they are not available now in fold. What would be the > preferred way to run some initialisation routines (and closing the > gracefully) when using fold? > > 3. Kind of related to above. I would also like to fetch a user profile > from external source in the beginning of the session. What would be a best > practice for that kind of operation? If I would be using the generic window > apply I could fetch in in the beginning of the apply method. I was thinking > of introducing a mapper that fetches this profiler periodically and caches > it to flink state. However, with this setup I would not be able to tie this > to user sessions identified for windows. > > 4. I also may have an additional requirement of writing out each event > enriched with current session and profile data. I basically could do this > again with generic window function and write out each event with collector > when iterating, but would there be a better pattern to use? Maybe sharing > state with functions or something. > > Br, > Henri H > -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> ja...@data-artisans.com