Hi Austin,

your explanation for the KeyedProcessFunction implementation sounds good to me. Using the time and state primitives for this task will make the implementation more explicit but also more readable.

Let me know if you could solve your use case.

Regards,
Timo


On 09.10.20 17:27, Austin Cawley-Edwards wrote:
Hey Timo,

Hah, that's a fair point about using time. I guess I should update my statement to "as a user, I don't want to worry about /manually managing/ time".

That's a nice suggestion with the KeyedProcessFunction and no windows, I'll give that a shot. If I don't want to emit any duplicates, I'd have to essentially buffer the "last seen duplicate" for each key in that process function until the MAX_WATERMARK is sent through though, right? I could emit early results if I assume the max number of possible duplicates, but for records with no duplicates, I'd have to wait until no more records are coming -- am I missing something?

Thanks so much,
Austin

On Fri, Oct 9, 2020 at 10:44 AM Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Austin,

    if you don't want to worry about time at all, you should probably not
    use any windows because those are a time-based operation.

    A solution that would look a bit nicer could be to use a pure
    KeyedProcessFunction and implement the deduplication logic without
    reusing windows. In ProcessFunctions you can register an event-time
    timer. The timer would be triggered by the MAX_WATERMARK when the
    pipeline shuts down even without having a timestamp assigned in the
    StreamRecord. Watermark will leave SQL also without a time attribute as
    far as I know.

    Regards,
    Timo


    On 08.10.20 17:38, Austin Cawley-Edwards wrote:
     > Hey Timo,
     >
     > Sorry for the delayed reply. I'm using the Blink planner and using
     > non-time-based joins. I've got an example repo here that shows my
    query/
     > setup [1]. It's got the manual timestamp assignment commented out
    for
     > now, but that does indeed solve the issue.
     >
     > I'd really like to not worry about time at all in this job hah -- I
     > started just using processing time, but Till pointed out that
    processing
     > time timers won't be fired when input ends, which is the case for
    this
     > streaming job processing CSV files, so I should be using event time.
     > With that suggestion, I switched to ingestion time, where I then
     > discovered the issue converting from SQL to data stream.
     >
     > IMO, as a user manually assigning timestamps on conversion makes
    sense
     > if you're using event time and already handling time attributes
     > yourself, but for ingestion time you really don't want to think
    about
     > time at all, which is why it might make sense to propigate the
     > automatically assigned timestamps in that case. Though not sure how
     > difficult that would be. Let me know what you think!
     >
     >
     > Best + thanks again,
     > Austin
     >
     > [1]: https://github.com/austince/flink-1.10-sql-windowing-error
     >
     > On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <twal...@apache.org
    <mailto:twal...@apache.org>
     > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
     >
     >     Btw which planner are you using?
     >
     >     Regards,
     >     Timo
     >
     >     On 05.10.20 10:23, Timo Walther wrote:
     >      > Hi Austin,
     >      >
     >      > could you share some details of your SQL query with us? The
     >     reason why
     >      > I'm asking is because I guess that the rowtime field is
    not inserted
     >      > into the `StreamRecord` of DataStream API. The rowtime
    field is only
     >      > inserted if there is a single field in the output of the query
     >     that is a
     >      > valid "time attribute".
     >      >
     >      > Esp. after non-time-based joins and aggregations, time
    attributes
     >     loose
     >      > there properties and become regular timestamps. Because
    timestamp
     >     and
     >      > watermarks might have diverged.
     >      >
     >      > If you know what you're doing, you can also assign the
    timestamp
     >      > manually after
    `toRetractStream.assignTimestampAndWatermarks` and
     >      > reinsert the field into the stream record. But before you
    do that, I
     >      > think it is better to share more information about the
    query with us.
     >      >
     >      > I hope this helps.
     >      >
     >      > Regards,
     >      > Timo
     >      >
     >      >
     >      >
     >      > On 05.10.20 09:25, Till Rohrmann wrote:
     >      >> Hi Austin,
     >      >>
     >      >> thanks for offering to help. First I would suggest asking
    Timo
     >     whether
     >      >> this is an aspect which is still missing or whether we
     >     overlooked it.
     >      >> Based on that we can then take the next steps.
     >      >>
     >      >> Cheers,
     >      >> Till
     >      >>
     >      >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
     >      >> <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
    <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com> <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>>>
     >     wrote:
     >      >>
     >      >>     Hey Till,
     >      >>
     >      >>     Thanks for the notes. Yeah, the docs don't mention
    anything
     >     specific
     >      >>     to this case, not sure if it's an uncommon one. Assigning
     >     timestamps
     >      >>     on conversion does solve the issue. I'm happy to take
    a stab at
     >      >>     implementing the feature if it is indeed missing and
    you all
     >     think
     >      >>     it'd be worthwhile. I think it's definitely a confusing
     >     aspect of
     >      >>     working w/ the Table & DataStream APIs together.
     >      >>
     >      >>     Best,
     >      >>     Austin
     >      >>
     >      >>     On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann
     >     <trohrm...@apache.org <mailto:trohrm...@apache.org>
    <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>
     >      >>     <mailto:trohrm...@apache.org
    <mailto:trohrm...@apache.org> <mailto:trohrm...@apache.org
    <mailto:trohrm...@apache.org>>>>
     >     wrote:
     >      >>
     >      >>         Hi Austin,
     >      >>
     >      >>         yes, it should also work for ingestion time.
     >      >>
     >      >>         I am not entirely sure whether event time is
    preserved when
     >      >>         converting a Table into a retract stream. It
    should be
     >     possible
     >      >>         and if it is not working, then I guess it is a
    missing
     >     feature.
     >      >>         But I am sure that @Timo Walther
     >      >>         <mailto:twal...@apache.org
    <mailto:twal...@apache.org>
     >     <mailto:twal...@apache.org
    <mailto:twal...@apache.org>>> knows more about it. In doubt, you
     >      >>         could assign a new watermark generator when having
     >     obtained the
     >      >>         retract stream.
     >      >>
     >      >>         Here is also a link to some information about
    event time and
     >      >>         watermarks [1]. Unfortunately, it does not state
     >     anything about
     >      >>         the direction Table => DataStream.
     >      >>
     >      >>         [1]
     >      >>
     >      >>
     >
    
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
     >
     >      >>
     >      >>
     >      >>         Cheers,
     >      >>         Till
     >      >>
     >      >>         On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
     >      >>         <austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>> <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>>> wrote:
     >      >>
     >      >>             Hey Till,
     >      >>
     >      >>             Just a quick question on time characteristics --
     >     this should
     >      >>             work for IngestionTime as well, correct? Is there
     >     anything
     >      >>             special I need to do to have the CsvTableSource/
     >      >>             toRetractStream call to carry through the
    assigned
     >      >>             timestamps, or do I have to re-assign timestamps
     >     during the
     >      >>             conversion? I'm currently getting the `Record has
     >      >>             Long.MIN_VALUE timestamp (= no timestamp
    marker)` error,
     >      >>             though I'm seeing timestamps being assigned
    if I step
     >      >>             through the AutomaticWatermarkContext.
     >      >>
     >      >>             Thanks,
     >      >>             Austin
     >      >>
     >      >>             On Thu, Oct 1, 2020 at 10:52 AM Austin
    Cawley-Edwards
     >      >>             <austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>> <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>>>
     >      >>             wrote:
     >      >>
     >      >>                 Perfect, thanks so much Till!
     >      >>
     >      >>                 On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann
     >      >>                 <trohrm...@apache.org
    <mailto:trohrm...@apache.org>
     >     <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>
    <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>
     >     <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>>>
     >      >> wrote:
     >      >>
     >      >>                     Hi Austin,
     >      >>
     >      >>                     I believe that the problem is the
    processing
     >     time
     >      >>                     window. Unlike for event time where
    we send a
     >      >>                     MAX_WATERMARK at the end of the stream to
     >     trigger
     >      >>                     all remaining windows, this does not
    happen for
     >      >>                     processing time windows. Hence, if your
     >     stream ends
     >      >>                     and you still have an open processing
    time
     >     window,
     >      >>                     then it will never get triggered.
     >      >>
     >      >>                     The problem should disappear if you use
     >     event time
     >      >>                     or if you process unbounded streams which
     >     never end.
     >      >>
     >      >>                     Cheers,
     >      >>                     Till
     >      >>
     >      >>                     On Thu, Oct 1, 2020 at 12:01 AM Austin
     >      >>                     Cawley-Edwards
    <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
     >      >>                     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>>> wrote:
     >      >>
     >      >>                         Hey all,
     >      >>
     >      >>                         Thanks for your patience. I've got a
     >     small repo
     >      >>                         that reproduces the issue here:
     >      >>
     >      >> https://github.com/austince/flink-1.10-sql-windowing-error
     >      >>
     >      >>
     >      >>                         Not sure what I'm doing wrong but it
     >     feels silly.
     >      >>
     >      >>                         Thanks so much!
     >      >>                         Austin
     >      >>
     >      >>                         On Tue, Sep 29, 2020 at 3:48 PM
    Austin
     >      >>                         Cawley-Edwards
    <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
     >      >>                         <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>>> wrote:
     >      >>
     >      >>                             Hey Till,
     >      >>
     >      >>                             Thanks for the reply -- I'll
    try to
     >     see if I
     >      >>                             can reproduce this in a small
    repo
     >     and share
     >      >>                             it with you.
     >      >>
     >      >>                             Best,
     >      >>                             Austin
     >      >>
     >      >>                             On Tue, Sep 29, 2020 at 3:58
    AM Till
     >      >>                             Rohrmann
    <trohrm...@apache.org <mailto:trohrm...@apache.org>
     >     <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>
     >      >>                             <mailto:trohrm...@apache.org
    <mailto:trohrm...@apache.org>
     >     <mailto:trohrm...@apache.org <mailto:trohrm...@apache.org>>>>
    wrote:
     >      >>
     >      >>                                 Hi Austin,
     >      >>
     >      >>                                 could you share with us the
     >     exact job
     >      >>                                 you are running
    (including the
     >     custom
     >      >>                                 window trigger)? This
    would help
     >     us to
     >      >>                                 better understand your
    problem.
     >      >>
     >      >>                                 I am also pulling in Klou and
     >     Timo who
     >      >>                                 might help with the windowing
     >     logic and
     >      >>                                 the Table to DataStream
    conversion.
     >      >>
     >      >>                                 Cheers,
     >      >>                                 Till
     >      >>
     >      >>                                 On Mon, Sep 28, 2020 at
    11:49 PM
     >     Austin
     >      >>                                 Cawley-Edwards
     >     <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
    <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>>
>      >> <mailto:austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>
     >     <mailto:austin.caw...@gmail.com
    <mailto:austin.caw...@gmail.com>>>> wrote:
     >      >>
     >      >>                                     Hey all,
     >      >>
     >      >>                                     I'm not sure if I've
    missed
     >      >>                                     something in the
    docs, but I'm
     >      >>                                     having a bit of
    trouble with a
     >      >>                                     streaming SQL job that
     >     starts w/ raw
     >      >>                                     SQL queries and then
     >     transitions to
     >      >>                                     a more traditional
    streaming
     >     job.
     >      >>                                     I'm on Flink 1.10
    using the
     >     Blink
     >      >>                                     planner, running
    locally with no
     >      >>                                     checkpointing.
     >      >>
     >      >>                                     The job looks roughly
    like:
     >      >>
     >      >>                                     CSV 1 -->
     >      >>                                     CSV 2 -->  SQL Query
    to Join -->
     >      >>                                     toRetractStream -->
    keyed time
     >      >>                                     window w/ process
    func & custom
     >      >>                                     trigger --> some
    other ops
     >      >>                                     CSV 3 -->
     >      >>
     >      >>
     >      >>                                     When I remove the
    windowing
     >     directly
     >      >>                                     after the
    `toRetractStream`, the
     >      >>                                     records make it to
    the "some
     >     other
     >      >>                                     ops" stage, but with the
     >     windowing,
     >      >>                                     those operations are
     >     sometimes not
     >      >>                                     sent any data. I can also
     >     get data
     >      >>                                     sent to the downstream
     >     operators by
     >      >>                                     putting in a no-op map
     >     before the
     >      >>                                     window and placing some
     >     breakpoints
     >      >>                                     in there to manually
    slow down
     >      >>                                     processing.
     >      >>
     >      >>
     >      >>                                     The logs don't seem
    to indicate
     >      >>                                     anything went wrong and
     >     generally
     >      >>                                     look like:
     >      >>
     >      >>                                     4819 [Source: Custom
    File source
     >      >>                                     (1/1)] INFO
     >      >>
     >      >>  org.apache.flink.runtime.taskmanager.Task  - Source:
    Custom File
     >      >> source (1/1) (3578629787c777320d9ab030c004abd4) switched from
     >     RUNNING
     >      >> to FINISHED.\4819 [Source: Custom File source (1/1)] INFO
     >      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
     >     resources
     >      >> for Source: Custom File source (1/1)
     >     (3578629787c777320d9ab030c004abd4).
     >      >>                                     4819 [Source: Custom
    File source
     >      >>                                     (1/1)] INFO
     >      >>
     >      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
     >     FileSystem
     >      >> streams are closed for task Source: Custom File source (1/1)
     >      >> (3578629787c777320d9ab030c004abd4) [FINISHED]
     >      >>                                     4820
     >      >>
     >      >> [flink-akka.actor.default-dispatcher-5]
     >      >>                                     INFO
     >      >>
     >      >>  org.apache.flink.runtime.taskexecutor.TaskExecutor  -
     >     Un-registering
     >      >> task and sending final execution state FINISHED to JobManager
     >     for task
     >      >> Source: Custom File source (1/1)
    3578629787c777320d9ab030c004abd4.
     >      >>                                     ...
     >      >>                                     4996
     >      >>
     >      >> [Window(TumblingProcessingTimeWindows(60000),
     >      >>                                     TimedCountTrigger,
     >      >>                                     ProcessWindowFunction$1)
     >     (1/1)] INFO
     >      >>
     >      >>  org.apache.flink.runtime.taskmanager.Task  -
     >      >> Window(TumblingProcessingTimeWindows(60000),
    TimedCountTrigger,
     >      >> ProcessWindowFunction$1) (1/1)
    (907acf9bfa2f4a9bbd13997b8b34d91f)
     >      >> switched from RUNNING to FINISHED.
     >      >>                                     4996
     >      >>
     >      >> [Window(TumblingProcessingTimeWindows(60000),
     >      >>                                     TimedCountTrigger,
     >      >>                                     ProcessWindowFunction$1)
     >     (1/1)] INFO
     >      >>
     >      >>  org.apache.flink.runtime.taskmanager.Task  - Freeing task
     >     resources
     >      >> for Window(TumblingProcessingTimeWindows(60000),
    TimedCountTrigger,
     >      >> ProcessWindowFunction$1) (1/1)
    (907acf9bfa2f4a9bbd13997b8b34d91f).
     >      >>                                     4996
     >      >>
     >      >> [Window(TumblingProcessingTimeWindows(60000),
     >      >>                                     TimedCountTrigger,
     >      >>                                     ProcessWindowFunction$1)
     >     (1/1)] INFO
     >      >>
     >      >>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all
     >     FileSystem
     >      >> streams are closed for task
     >      >> Window(TumblingProcessingTimeWindows(60000),
    TimedCountTrigger,
     >      >> ProcessWindowFunction$1) (1/1)
    (907acf9bfa2f4a9bbd13997b8b34d91f)
     >      >> [FINISHED]
     >      >>                                     ...
     >      >>                                     rest of the shutdown
     >      >>                                     ...
     >      >>                                     Program execution
    finished
     >      >>                                     Job with JobID
     >      >>
     >     889b161e432c0e69a8d760bbed205d5d has
     >      >>                                     finished.
     >      >>                                     Job Runtime: 783 ms
     >      >>
     >      >>
     >      >>                                     Is there something I'm
     >     missing in my
     >      >>                                     setup? Could it be my
    custom
     >     window
     >      >>                                     trigger? Bug? I'm
    stumped.
     >      >>
     >      >>
     >      >>                                     Thanks,
     >      >>                                     Austin
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >>
     >      >
     >


Reply via email to