Hi Yuval,

sorry that nobody replied earlier. Somehow your email fell through the cracks.

If I understand you correctly, could would like to implement a table source that implements both `SupportsWatermarkPushDown` and `SupportsFilterPushDown`?

The current behavior might be on purpose. Filters and Watermarks are not very compatible. Filtering would also mean that records (from which watermarks could be generated) are skipped. If the filter is very strict, we would not generate any new watermarks and the pipeline would stop making progress in time.

Watermark push down is only necessary, if per-partition watermarks are required. Otherwise the watermarks are generated in a subsequent operator after the source. So you can still use rowtime without implementing `SupportsWatermarkPushDown` in your custom source.

I will lookp in Shengkai who worked on this topic recently.

Regards,
Timo


On 04.03.21 18:52, Yuval Itzchakov wrote:
Bumping this up again, would appreciate any help if anyone is familiar with the blink planner.

Thanks,
Yuval.

On Fri, Feb 26, 2021, 18:53 Yuval Itzchakov <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:

    Hi Jark,
    Would appreciate your help with this.

    On Wed, Feb 24, 2021 at 12:09 PM Roman Khachatryan <ro...@apache.org
    <mailto:ro...@apache.org>> wrote:

        Hi Yuval,

        I'm not familiar with the Blink planner but probably Jark can help.

        Regards,
        Roman


        On Sun, Feb 21, 2021 at 6:52 PM Yuval Itzchakov
        <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:

            Update: When I don't set the watermark explicitly on the
            TableSchema, `applyWatermarkStrategy` never gets called on
            my ScanTableSource, which does make sense. But now the
            question is what should be done? This feels a bit unintuitive.

            On Sun, Feb 21, 2021 at 7:09 PM Yuval Itzchakov
            <yuva...@gmail.com <mailto:yuva...@gmail.com>> wrote:

                Hi,
                Flink 1.12.1, Blink Planner, Scala 2.12

                I have the following logical plan:

                  LogicalSink(table=[default_catalog.default_database.table], 
fields=[bar, baz, hello_world, a, b])
                +- LogicalProject(value=[$2],
                bar=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
                baz=[CAST(CAST($0):TIMESTAMP(3)):TIMESTAMP(6)],
                hello_world=[null:VARCHAR(2147483647) CHARACTER SET
                "UTF-16LE"], a=[null:VARCHAR(2147483647) CHARACTER SET
                "UTF-16LE"], b=[EMPTY_MAP()])
                    +- LogicalFilter(condition=[AND(=($4,
                _UTF-16LE'bar'), =($34, _UTF-16LE'baz'))])
                       +- LogicalWatermarkAssigner(rowtime=[bar],
                watermark=[$0])
                          +- LogicalTableScan(table=[[default_catalog,
                default_database, foo]])

                I have a custom source which creates a TableSchema based
                on an external table. When I create the schema, I push
                the watermark definition to the schema:

                image.png

                When the HepPlanner starts the optimization phase and
                reaches the "PushFilterInotTableSourceScanRule", it
                matches on the LogicalFilter in the definition. But
                then, since the RelOptRuleOperandChildPolicy is set to
                "SOME", it attempts to do a full match on the child
                nodes. Since the rule is defined as so:

                image.png

                The child filter fails since the immediate child of the
                filter is a "LocalWatermarkAssigner", and not the
                "LogicalTableScan" which is the grandchild:

                image.png

                Is this the desired behavior? Should I create the
                TableSchema without the row time attribute and use
                "SupportsWatermarkPushdown" to generate the watermark
                dynamically from the source record?

-- Best Regards,
                Yuval Itzchakov.



-- Best Regards,
            Yuval Itzchakov.



-- Best Regards,
    Yuval Itzchakov.


Reply via email to