Hey Timo,
Hah, that's a fair point about using time. I guess I should update my
statement to "as a user, I don't want to worry about /manually managing/
time".
That's a nice suggestion with the KeyedProcessFunction and no windows,
I'll give that a shot. If I don't want to emit any duplicates, I'd have
to essentially buffer the "last seen duplicate" for each key in that
process function until the MAX_WATERMARK is sent through though, right?
I could emit early results if I assume the max number of possible
duplicates, but for records with no duplicates, I'd have to wait until
no more records are coming -- am I missing something?
Thanks so much,
Austin
On Fri, Oct 9, 2020 at 10:44 AM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Austin,
if you don't want to worry about time at all, you should probably not
use any windows because those are a time-based operation.
A solution that would look a bit nicer could be to use a pure
KeyedProcessFunction and implement the deduplication logic without
reusing windows. In ProcessFunctions you can register an event-time
timer. The timer would be triggered by the MAX_WATERMARK when the
pipeline shuts down even without having a timestamp assigned in the
StreamRecord. Watermark will leave SQL also without a time attribute as
far as I know.
Regards,
Timo
On 08.10.20 17:38, Austin Cawley-Edwards wrote:
> Hey Timo,
>
> Sorry for the delayed reply. I'm using the Blink planner and using
> non-time-based joins. I've got an example repo here that shows my
query/
> setup [1]. It's got the manual timestamp assignment commented out
for
> now, but that does indeed solve the issue.
>
> I'd really like to not worry about time at all in this job hah -- I
> started just using processing time, but Till pointed out that
processing
> time timers won't be fired when input ends, which is the case for
this
> streaming job processing CSV files, so I should be using event time.
> With that suggestion, I switched to ingestion time, where I then
> discovered the issue converting from SQL to data stream.
>
> IMO, as a user manually assigning timestamps on conversion makes
sense
> if you're using event time and already handling time attributes
> yourself, but for ingestion time you really don't want to think
about
> time at all, which is why it might make sense to propigate the
> automatically assigned timestamps in that case. Though not sure how
> difficult that would be. Let me know what you think!
>
>
> Best + thanks again,
> Austin
>
> [1]: https://github.com/austince/flink-1.10-sql-windowing-error
>
> On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>
> <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
>
> Btw which planner are you using?
>
> Regards,
> Timo
>
> On 05.10.20 10:23, Timo Walther wrote:
> > Hi Austin,
> >
> > could you share some details of your SQL query with us? The
> reason why
> > I'm asking is because I guess that the rowtime field is
not inserted
> > into the `StreamRecord` of DataStream API. The rowtime
field is only
> > inserted if there is a single field in the output of the query
> that is a
> > valid "time attribute".
> >
> > Esp. after non-time-based joins and aggregations, time
attributes
> loose
> > there properties and become regular timestamps. Because
timestamp
> and
> > watermarks might have diverged.
> >
> > If you know what you're doing, you can also assign the
timestamp
> > manually after
`toRetractStream.assignTimestampAndWatermarks` and
> > reinsert the field into the stream record. But before you
do that, I
> > think it is better to share more information about the
query with us.
> >
> > I hope this helps.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 05.10.20 09:25, Till Rohrmann wrote:
> >> Hi Austin,
> >>
> >> thanks for offering to help. First I would suggest asking
Timo
> whether
> >> this is an aspect which is still missing or whether we
> overlooked it.
> >> Based on that we can then take the next steps.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
> >> <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
<mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>>>>
> wrote:
> >>
> >> Hey Till,
> >>
> >> Thanks for the notes. Yeah, the docs don't mention
anything
> specific
> >> to this case, not sure if it's an uncommon one. Assigning
> timestamps
> >> on conversion does solve the issue. I'm happy to take
a stab at
> >> implementing the feature if it is indeed missing and
you all
> think
> >> it'd be worthwhile. I think it's definitely a confusing
> aspect of
> >> working w/ the Table & DataStream APIs together.
> >>
> >> Best,
> >> Austin
> >>
> >> On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
> <trohrm...@apache.org <mailto:trohrm...@apache.org>
<mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>
> >> <mailto:trohrm...@apache.org
<mailto:trohrm...@apache.org> <mailto:trohrm...@apache.org
<mailto:trohrm...@apache.org>>>>
> wrote:
> >>
> >> Hi Austin,
> >>
> >> yes, it should also work for ingestion time.
> >>
> >> I am not entirely sure whether event time is
preserved when
> >> converting a Table into a retract stream. It
should be
> possible
> >> and if it is not working, then I guess it is a
missing
> feature.
> >> But I am sure that @Timo Walther
> >> <mailto:twal...@apache.org
<mailto:twal...@apache.org>
> <mailto:twal...@apache.org
<mailto:twal...@apache.org>>> knows more about it. In doubt, you
> >> could assign a new watermark generator when having
> obtained the
> >> retract stream.
> >>
> >> Here is also a link to some information about
event time and
> >> watermarks [1]. Unfortunately, it does not state
> anything about
> >> the direction Table => DataStream.
> >>
> >> [1]
> >>
> >>
>
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
>
> >>
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
> >> <austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>>>> wrote:
> >>
> >> Hey Till,
> >>
> >> Just a quick question on time characteristics --
> this should
> >> work for IngestionTime as well, correct? Is there
> anything
> >> special I need to do to have the CsvTableSource/
> >> toRetractStream call to carry through the
assigned
> >> timestamps, or do I have to re-assign timestamps
> during the
> >> conversion? I'm currently getting the `Record has
> >> Long.MIN_VALUE timestamp (= no timestamp
marker)` error,
> >> though I'm seeing timestamps being assigned
if I step
> >> through the AutomaticWatermarkContext.
> >>
> >> Thanks,
> >> Austin
> >>
> >> On Thu, Oct 1, 2020 at 10:52 AM Austin
Cawley-Edwards
> >> <austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>>>>
> >> wrote:
> >>
> >> Perfect, thanks so much Till!
> >>
> >> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
> >> <trohrm...@apache.org
<mailto:trohrm...@apache.org>
> <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>
<mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>
> <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>>>
> >> wrote:
> >>
> >> Hi Austin,
> >>
> >> I believe that the problem is the
processing
> time
> >> window. Unlike for event time where
we send a
> >> MAX_WATERMARK at the end of the stream to
> trigger
> >> all remaining windows, this does not
happen for
> >> processing time windows. Hence, if your
> stream ends
> >> and you still have an open processing
time
> window,
> >> then it will never get triggered.
> >>
> >> The problem should disappear if you use
> event time
> >> or if you process unbounded streams which
> never end.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Oct 1, 2020 at 12:01 AM Austin
> >> Cawley-Edwards
<austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
> >> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>>>> wrote:
> >>
> >> Hey all,
> >>
> >> Thanks for your patience. I've got a
> small repo
> >> that reproduces the issue here:
> >>
> >> https://github.com/austince/flink-1.10-sql-windowing-error
> >>
> >>
> >> Not sure what I'm doing wrong but it
> feels silly.
> >>
> >> Thanks so much!
> >> Austin
> >>
> >> On Tue, Sep 29, 2020 at 3:48 PM
Austin
> >> Cawley-Edwards
<austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
> >> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>>>> wrote:
> >>
> >> Hey Till,
> >>
> >> Thanks for the reply -- I'll
try to
> see if I
> >> can reproduce this in a small
repo
> and share
> >> it with you.
> >>
> >> Best,
> >> Austin
> >>
> >> On Tue, Sep 29, 2020 at 3:58
AM Till
> >> Rohrmann
<trohrm...@apache.org <mailto:trohrm...@apache.org>
> <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>
> >> <mailto:trohrm...@apache.org
<mailto:trohrm...@apache.org>
> <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>>>
wrote:
> >>
> >> Hi Austin,
> >>
> >> could you share with us the
> exact job
> >> you are running
(including the
> custom
> >> window trigger)? This
would help
> us to
> >> better understand your
problem.
> >>
> >> I am also pulling in Klou and
> Timo who
> >> might help with the windowing
> logic and
> >> the Table to DataStream
conversion.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Mon, Sep 28, 2020 at
11:49 PM
> Austin
> >> Cawley-Edwards
> <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
<mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
> >>
<mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
> <mailto:austin.caw...@gmail.com
<mailto:austin.caw...@gmail.com>>>> wrote:
> >>
> >> Hey all,
> >>
> >> I'm not sure if I've
missed
> >> something in the
docs, but I'm
> >> having a bit of
trouble with a
> >> streaming SQL job that
> starts w/ raw
> >> SQL queries and then
> transitions to
> >> a more traditional
streaming
> job.
> >> I'm on Flink 1.10
using the
> Blink
> >> planner, running
locally with no
> >> checkpointing.
> >>
> >> The job looks roughly
like:
> >>
> >> CSV 1 -->
> >> CSV 2 --> SQL Query
to Join -->
> >> toRetractStream -->
keyed time
> >> window w/ process
func & custom
> >> trigger --> some
other ops
> >> CSV 3 -->
> >>
> >>
> >> When I remove the
windowing
> directly
> >> after the
`toRetractStream`, the
> >> records make it to
the "some
> other
> >> ops" stage, but with the
> windowing,
> >> those operations are
> sometimes not
> >> sent any data. I can also
> get data
> >> sent to the downstream
> operators by
> >> putting in a no-op map
> before the
> >> window and placing some
> breakpoints
> >> in there to manually
slow down
> >> processing.
> >>
> >>
> >> The logs don't seem
to indicate
> >> anything went wrong and
> generally
> >> look like:
> >>
> >> 4819 [Source: Custom
File source
> >> (1/1)] INFO
> >>
> >> org.apache.flink.runtime.taskmanager.Task - Source:
Custom File
> >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from
> RUNNING
> >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
> >> org.apache.flink.runtime.taskmanager.Task - Freeing task
> resources
> >> for Source: Custom File source (1/1)
> (3578629787c777320d9ab030c004abd4).
> >> 4819 [Source: Custom
File source
> >> (1/1)] INFO
> >>
> >> org.apache.flink.runtime.taskmanager.Task - Ensuring all
> FileSystem
> >> streams are closed for task Source: Custom File source (1/1)
> >> (3578629787c777320d9ab030c004abd4) [FINISHED]
> >> 4820
> >>
> >> [flink-akka.actor.default-dispatcher-5]
> >> INFO
> >>
> >> org.apache.flink.runtime.taskexecutor.TaskExecutor -
> Un-registering
> >> task and sending final execution state FINISHED to JobManager
> for task
> >> Source: Custom File source (1/1)
3578629787c777320d9ab030c004abd4.
> >> ...
> >> 4996
> >>
> >> [Window(TumblingProcessingTimeWindows(60000),
> >> TimedCountTrigger,
> >> ProcessWindowFunction$1)
> (1/1)] INFO
> >>
> >> org.apache.flink.runtime.taskmanager.Task -
> >> Window(TumblingProcessingTimeWindows(60000),
TimedCountTrigger,
> >> ProcessWindowFunction$1) (1/1)
(907acf9bfa2f4a9bbd13997b8b34d91f)
> >> switched from RUNNING to FINISHED.
> >> 4996
> >>
> >> [Window(TumblingProcessingTimeWindows(60000),
> >> TimedCountTrigger,
> >> ProcessWindowFunction$1)
> (1/1)] INFO
> >>
> >> org.apache.flink.runtime.taskmanager.Task - Freeing task
> resources
> >> for Window(TumblingProcessingTimeWindows(60000),
TimedCountTrigger,
> >> ProcessWindowFunction$1) (1/1)
(907acf9bfa2f4a9bbd13997b8b34d91f).
> >> 4996
> >>
> >> [Window(TumblingProcessingTimeWindows(60000),
> >> TimedCountTrigger,
> >> ProcessWindowFunction$1)
> (1/1)] INFO
> >>
> >> org.apache.flink.runtime.taskmanager.Task - Ensuring all
> FileSystem
> >> streams are closed for task
> >> Window(TumblingProcessingTimeWindows(60000),
TimedCountTrigger,
> >> ProcessWindowFunction$1) (1/1)
(907acf9bfa2f4a9bbd13997b8b34d91f)
> >> [FINISHED]
> >> ...
> >> rest of the shutdown
> >> ...
> >> Program execution
finished
> >> Job with JobID
> >>
> 889b161e432c0e69a8d760bbed205d5d has
> >> finished.
> >> Job Runtime: 783 ms
> >>
> >>
> >> Is there something I'm
> missing in my
> >> setup? Could it be my
custom
> window
> >> trigger? Bug? I'm
stumped.
> >>
> >>
> >> Thanks,
> >> Austin
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >
>