Hi Benoît,

Do you mean if you register one TableSource, and add two sinks from the
same TableSource, the source will duplicate ?
If so, maybe you can check
*TableEnvironmentImpl.isEagerOperationTranslation*, it's *false* by
default. But in *StreamTableEnvironmentImpl*, it's *true* because we need
eager translation to keep alignment with DataStream Api.
If you don't need Table <-> DataStream translation, you can just
use TableEnvironmentImpl instead of StreamTableEnvironmentImpl to achieve
your goal.

Hope it helps.

Benoît Paris <benoit.pa...@centraliens-lille.org> 于2020年1月23日周四 上午6:50写道:

> Hello all!
>
> I'm having a problem with TableSources' DataStream being duplicated when
> pulled on from 2 sinks.
>
> I understand that sometimes the best plan might just be to duplicate and
> read both times a TableSource/SourceFunction; but in my case I can't quite
> reproduce the data as say Kafka would. I just need the SourceFunction and
> DataStream provided by the TableSource to not be duplicated.
>
> As a workaround to this issue, I introduce some sort of materialization
> barrier that makes the planner pull only on one instance of the
> TableSource/SourceFunction:
> Instead of:
>
> tEnv.registerTableSource("foo_table", new FooTableSource());
>
> I convert it to an Append Stream, and back again to a Table:
>
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
>         tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, 
> new TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);
>
> And the conversion to an Append Stream somewhat makes the planner behave
> and there is only one DataSource in the execution plan.
>
> But I'm feeling like I'm just missing a simple option (on the
> SourceFunction, or on the TableSource?) to invoke and declare the Source as
> being non duplicateable.
>
> I have tried a lot of options (uid(), operation chaining restrictions,
> twiddling the transformation, forceNonParallel(), etc.), but can't find
> quite how to do that! My SourceFunction is a RichSourceFunction
>
> At this point I'm wondering if this is a bug, or if it is a feature that
> would have to be implemented.
>
> Cheers,
> Ben
>
>
>
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Reply via email to