Hello John, The requirement you have can be achieved by having a process window function in order to enrich the aggregate data with metadata information of the window. Please have a look at the training example[1] to see how to access the window information within a process window function.
Sincerely, Ali [1]: https://github.com/ververica/flink-training/blob/master/troubleshooting/introduction/src/main/java/com/ververica/flink/training/exercises/TroubledStreamingJob.java#L155 On Mon, Feb 14, 2022 at 5:43 PM John Smith <java.dev....@gmail.com> wrote: > Hi, I get that but I want to output that key so I can store it in Elastic > grouped by the minute. > > I had explained with data examples above. But just to be sure.... > > Lets pretends the current WALL time is 2022-02-14T11:38:01.123Z and I get > the bellow clicks > > event time here (ignored/not read)|cnn.com|/some-article > event time here (ignored/not read)|cnn.com|/some-article > event time here (ignored/not read)|cnn.com|/another-article > event time here (ignored/not read)|cnn.com|/some-article > > The output should be... > > 2022-02-14T11:38:00.000Z (this is the wall time rounded to the minute)| > cnn.com|some-article count = 3 > 2022-02-14T11:38:00.000Z( this is the wall time rounded to the minute)| > cnn.com|another-article count = 1 > > > > > > On Mon, Feb 14, 2022 at 10:08 AM Ali Bahadir Zeybek <a...@ververica.com> > wrote: > >> Hello John, >> >> That is what exactly the window operator does for you. Can you please >> check the >> documentation[1] and let us know what part of the window operator alone >> does >> not suffice for the use case? >> >> Sincerely, >> >> Ali >> >> [1]: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows >> >> On Mon, Feb 14, 2022 at 4:03 PM John Smith <java.dev....@gmail.com> >> wrote: >> >>> Because I want to group them for the last X minutes. In this case last 1 >>> minute. >>> >>> On Mon, Feb 14, 2022 at 10:01 AM Ali Bahadir Zeybek <a...@ververica.com> >>> wrote: >>> >>>> Hello John, >>>> >>>> Then may I ask you why you need to use a time attribute as part of your >>>> key? >>>> Why not just key by the fields like `mydomain.com` and `some-article` >>>> in your >>>> example and use only window operator for grouping elements based on >>>> time? >>>> >>>> Sincerely, >>>> >>>> Ali >>>> >>>> On Mon, Feb 14, 2022 at 3:55 PM John Smith <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> Hi, thanks. As previously mentioned, processing time. So I >>>>> regardless when the event was generated I want to count all events I have >>>>> right now (as soon as they are seen by the flink job). >>>>> >>>>> On Mon, Feb 14, 2022 at 4:16 AM Ali Bahadir Zeybek <a...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hello John, >>>>>> >>>>>> Currently you are grouping the elements two times based on some time >>>>>> attribute, one while keying - with event time - and one while >>>>>> windowing - with >>>>>> processing time. Therefore, the windowing mechanism produces a new >>>>>> window >>>>>> computation when you see an element with the same key but arrived >>>>>> later from >>>>>> the previous window start and end timestamps. Can you please clarify >>>>>> with >>>>>> which notion of time you would like to handle the stream of data? >>>>>> >>>>>> Sincerely, >>>>>> >>>>>> Ali >>>>>> >>>>>> On Fri, Feb 11, 2022 at 6:43 PM John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Ok I used the method suggested by Ali. The error is gone. But now I >>>>>>> see multiple counts emitted for the same key... >>>>>>> >>>>>>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, >>>>>>> WatermarkStrategy.noWatermarks(), "Kafka Source") >>>>>>> .uid(kafkaTopic).name(kafkaTopic) >>>>>>> .setParallelism(kafkaParallelism) >>>>>>> .flatMap(new MapToMyEvent("my-event", windowSizeMins, >>>>>>> "message")) <------ Timestamp in GMT created here rounded to the >>>>>>> closest minute down. >>>>>>> .uid("map-json-logs").name("map-json-logs"); >>>>>>> >>>>>>> slStream.keyBy(new MinutesKeySelector()) >>>>>>> >>>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(windowSizeMins))) >>>>>>> <---- Tumbling window of 1 minute. >>>>>>> >>>>>>> >>>>>>> >>>>>>> So below you will see a new count was emitted at 16:51 and 16:55 >>>>>>> >>>>>>> {"countId":"2022-02-11T16:50:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:50:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":3542} >>>>>>> ----- >>>>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":16503} >>>>>>> {"countId":"2022-02-11T16:51:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:51:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":70} >>>>>>> ----- >>>>>>> >>>>>>> {"countId":"2022-02-11T16:52:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:52:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":16037} >>>>>>> {"countId":"2022-02-11T16:53:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:53:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":18679} >>>>>>> {"countId":"2022-02-11T16:54:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:54:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":17697} >>>>>>> ----- >>>>>>> >>>>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":18066} >>>>>>> {"countId":"2022-02-11T16:55:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:55:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":58} >>>>>>> ----- >>>>>>> {"countId":"2022-02-11T16:56:00Z|mydomain.com >>>>>>> |/some-article","countDateTime":"2022-02-11T16:56:00Z","domain":" >>>>>>> mydomain.com","uri":"/some-article","count":17489} >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Feb 7, 2022 at 12:44 PM John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Ok I think Ali's solution makes the most sense to me. I'll try it >>>>>>>> and let you know. >>>>>>>> >>>>>>>> On Mon, Feb 7, 2022 at 11:44 AM Jing Ge <j...@ververica.com> wrote: >>>>>>>> >>>>>>>>> Hi John, >>>>>>>>> >>>>>>>>> your getKey() implementation shows that it is not deterministic, >>>>>>>>> since calling it with the same click instance multiple times will >>>>>>>>> return >>>>>>>>> different keys. For example a call at 12:01:59.950 and a call at >>>>>>>>> 12:02:00.050 with the same click instance will return two different >>>>>>>>> keys: >>>>>>>>> >>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name >>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name >>>>>>>>> >>>>>>>>> best regards >>>>>>>>> Jing >>>>>>>>> >>>>>>>>> On Mon, Feb 7, 2022 at 5:07 PM John Smith <java.dev....@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Maybe there's a misunderstanding. But basically I want to >>>>>>>>>> do clickstream count for a given "url" and for simplicity and >>>>>>>>>> accuracy of >>>>>>>>>> the count base it on processing time (event time doesn't matter as >>>>>>>>>> long as >>>>>>>>>> I get a total of clicks at that given processing time) >>>>>>>>>> >>>>>>>>>> So regardless of the event time. I want all clicks for the >>>>>>>>>> current processing time rounded to the minute per link. >>>>>>>>>> >>>>>>>>>> So, if now was 2022-04-07T12:01:00.000Z >>>>>>>>>> >>>>>>>>>> Then I would want the following result... >>>>>>>>>> >>>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-article-name count = 10 >>>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|some-other-article count = 2 >>>>>>>>>> 2022-04-07T12:01:00.000Z|cnn.com|another-article count = 15 >>>>>>>>>> .... >>>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-article-name count = 30 >>>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|some-other-article count = 1 >>>>>>>>>> 2022-04-07T12:02:00.000Z|cnn.com|another-article count = 10 >>>>>>>>>> And so on... >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public MyEventCountKey getKey(final MyEvent click) throws >>>>>>>>>> Exception >>>>>>>>>> { >>>>>>>>>> MyEventCountKey key = new MyEventCountKey( >>>>>>>>>> Instant.from(roundFloor(Instant.now().atZone(ZoneId.of("UTC")), >>>>>>>>>> ChronoField.MINUTE_OF_HOUR, windowSizeMins)).toString(), >>>>>>>>>> click.getDomain(), // cnn.com >>>>>>>>>> click.getPath(), // /some-article-name >>>>>>>>>> ); >>>>>>>>>> return key; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Feb 7, 2022 at 10:48 AM David Morávek <d...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> The key selector works. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> No it does not ;) It depends on the system time so it's not >>>>>>>>>>> deterministic (you can get different keys for the very same >>>>>>>>>>> element). >>>>>>>>>>> >>>>>>>>>>> How do you key a count based on the time. I have taken this from >>>>>>>>>>>> samples online. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is what the windowing is for. You basically want to group / >>>>>>>>>>> combine elements per key and event time window [1]. >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/ >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> D. >>>>>>>>>>> >>>>>>>>>>> On Mon, Feb 7, 2022 at 3:44 PM John Smith < >>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> The key selector works. It only causes an issue if there too >>>>>>>>>>>> many keys produced in one shot. For example of 100 "same" keys are >>>>>>>>>>>> produced >>>>>>>>>>>> for that 1 minutes it's ok. But if 101 are produced the error >>>>>>>>>>>> happens. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> If you look at the reproducer at least that's what's hapenning >>>>>>>>>>>> >>>>>>>>>>>> How do you key a count based on the time. I have taken this >>>>>>>>>>>> from samples online. >>>>>>>>>>>> >>>>>>>>>>>> The key is that particular time for that particular URL path. >>>>>>>>>>>> >>>>>>>>>>>> So cnn.com/article1 was clicked 10 times at 2022-01-01T10:01:00 >>>>>>>>>>>> >>>>>>>>>>>> On Mon., Feb. 7, 2022, 8:57 a.m. Chesnay Schepler, < >>>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Your Key selector doesn't need to implement hashCode, but >>>>>>>>>>>>> given the same object it has to return the same key. >>>>>>>>>>>>> In your reproducer the returned key will have different >>>>>>>>>>>>> timestamps, and since the timestamp is included in the hashCode, >>>>>>>>>>>>> they will >>>>>>>>>>>>> be different each time. >>>>>>>>>>>>> >>>>>>>>>>>>> On 07/02/2022 14:50, John Smith wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I don't get it? I provided the reproducer. I implemented the >>>>>>>>>>>>> interface to Key selector it needs hashcode and equals as well? >>>>>>>>>>>>> >>>>>>>>>>>>> I'm attempting to do click stream. So the key is based on >>>>>>>>>>>>> processing date/time rounded to the minute + domain name + path >>>>>>>>>>>>> >>>>>>>>>>>>> So these should be valid below? >>>>>>>>>>>>> >>>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article1 >>>>>>>>>>>>> >>>>>>>>>>>>> 2022-01-01T10:02:00 + cnn.com + /article2 >>>>>>>>>>>>> >>>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article1 >>>>>>>>>>>>> >>>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>>>>>> 2022-01-01T10:03:00 + cnn.com + /article3 >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon., Feb. 7, 2022, 2:53 a.m. Chesnay Schepler, < >>>>>>>>>>>>> ches...@apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Don't KeySelectors also need to be deterministic? >>>>>>>>>>>>>> >>>>>>>>>>>>>> * The {@link KeySelector} allows to use deterministic objects >>>>>>>>>>>>>> for operations such as reduce,* reduceGroup, join, coGroup, etc. >>>>>>>>>>>>>> *If invoked multiple times on the same object, the returned >>>>>>>>>>>>>> key*** must be the same.* >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 04/02/2022 18:25, John Smith wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Francesco, here is the reproducer: >>>>>>>>>>>>>> https://github.com/javadevmtl/flink-key-reproducer >>>>>>>>>>>>>> >>>>>>>>>>>>>> So, essentially it looks like when there's a high influx of >>>>>>>>>>>>>> records produced from the source that the Exception is thrown. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The key is generated by 3 values: date/time rounded to the >>>>>>>>>>>>>> minute and 2 strings. >>>>>>>>>>>>>> So you will see keys as follows... >>>>>>>>>>>>>> 2022-02-04T17:20:00Z|foo|bar >>>>>>>>>>>>>> 2022-02-04T17:21:00Z|foo|bar >>>>>>>>>>>>>> 2022-02-04T17:22:00Z|foo|bar >>>>>>>>>>>>>> >>>>>>>>>>>>>> The reproducer has a custom source that basically produces a >>>>>>>>>>>>>> record in a loop and sleeps for a specified period of >>>>>>>>>>>>>> milliseconds 100ms in >>>>>>>>>>>>>> this case. >>>>>>>>>>>>>> The lower the sleep delay the faster records are produced the >>>>>>>>>>>>>> more chances the exception is thrown. With a 100ms delay it's >>>>>>>>>>>>>> always >>>>>>>>>>>>>> thrown. Setting a 2000 to 3000ms will guarantee it to work. >>>>>>>>>>>>>> The original job uses a Kafka Source so it should technically >>>>>>>>>>>>>> be able to handle even a couple thousand records per second. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 16:41, John Smith < >>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ok it's not my data either. I think it may be a volume >>>>>>>>>>>>>>> issue. I have managed to consistently reproduce the error. I'll >>>>>>>>>>>>>>> upload a >>>>>>>>>>>>>>> reproducer ASAP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 15:37, John Smith < >>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Ok so I tried to create a reproducer but I couldn't >>>>>>>>>>>>>>>> reproduce it. But the actual job once in a while throws that >>>>>>>>>>>>>>>> error. So I'm >>>>>>>>>>>>>>>> wondering if maybe one of the records that comes in is not >>>>>>>>>>>>>>>> valid, though I >>>>>>>>>>>>>>>> do validate prior to getting to the key and window operators. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:32, John Smith < >>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Actually maybe not because with PrintSinkFunction it ran >>>>>>>>>>>>>>>>> for a bit and then it threw the error. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, 3 Feb 2022 at 14:24, John Smith < >>>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Ok it may be the ElasticSearch connector causing the >>>>>>>>>>>>>>>>>> issue? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If I use PrintSinkFunction then I get no error and my >>>>>>>>>>>>>>>>>> stats print as expected. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, 2 Feb 2022 at 03:01, Francesco Guardiani < >>>>>>>>>>>>>>>>>> france...@ververica.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> your hash code and equals seems correct. Can you post a >>>>>>>>>>>>>>>>>>> minimum stream pipeline reproducer using this class? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> FG >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Feb 1, 2022 at 8:39 PM John Smith < >>>>>>>>>>>>>>>>>>> java.dev....@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, getting java.lang.IllegalArgumentException: Key >>>>>>>>>>>>>>>>>>>> group 39 is not in KeyGroupRange{startKeyGroup=96, >>>>>>>>>>>>>>>>>>>> endKeyGroup=103}. Unless >>>>>>>>>>>>>>>>>>>> you're directly using low level state access APIs, this is >>>>>>>>>>>>>>>>>>>> most likely >>>>>>>>>>>>>>>>>>>> caused by non-deterministic shuffle key (hashCode and >>>>>>>>>>>>>>>>>>>> equals >>>>>>>>>>>>>>>>>>>> implementation). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This is my class, is my hashCode deterministic? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> public final class MyEventCountKey { >>>>>>>>>>>>>>>>>>>> private final String countDateTime; private final >>>>>>>>>>>>>>>>>>>> String domain; private final String event; public >>>>>>>>>>>>>>>>>>>> MyEventCountKey(final String countDateTime, final String >>>>>>>>>>>>>>>>>>>> domain, final String event) { >>>>>>>>>>>>>>>>>>>> this.countDateTime = countDateTime; >>>>>>>>>>>>>>>>>>>> this.domain = domain; this.event = event; } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> public String getCountDateTime() { >>>>>>>>>>>>>>>>>>>> return countDateTime; } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> public String getDomain() { >>>>>>>>>>>>>>>>>>>> return domain; } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> public String getEven() { >>>>>>>>>>>>>>>>>>>> return event; } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Override public String toString() { >>>>>>>>>>>>>>>>>>>> return countDateTime + "|" + domain + "|" + event; >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Override public boolean equals(Object o) { >>>>>>>>>>>>>>>>>>>> if (this == o) return true; if (o == null >>>>>>>>>>>>>>>>>>>> || getClass() != o.getClass()) return false; >>>>>>>>>>>>>>>>>>>> MyEventCountKey that = (MyEventCountKey) o; return >>>>>>>>>>>>>>>>>>>> countDateTime.equals(that.countDateTime) && >>>>>>>>>>>>>>>>>>>> domain.equals(that.domain) && >>>>>>>>>>>>>>>>>>>> event.equals(that.event); } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Override public int hashCode() { >>>>>>>>>>>>>>>>>>>> final int prime = 31; int result = 1; >>>>>>>>>>>>>>>>>>>> result = prime * result + countDateTime.hashCode(); >>>>>>>>>>>>>>>>>>>> result = prime * result + domain.hashCode(); >>>>>>>>>>>>>>>>>>>> result = prime * result + event.hashCode(); return >>>>>>>>>>>>>>>>>>>> result; } >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>