Hi Benoît, Do you mean if you register one TableSource, and add two sinks from the same TableSource, the source will duplicate ? If so, maybe you can check *TableEnvironmentImpl.isEagerOperationTranslation*, it's *false* by default. But in *StreamTableEnvironmentImpl*, it's *true* because we need eager translation to keep alignment with DataStream Api. If you don't need Table <-> DataStream translation, you can just use TableEnvironmentImpl instead of StreamTableEnvironmentImpl to achieve your goal.
Hope it helps. Benoît Paris <benoit.pa...@centraliens-lille.org> 于2020年1月23日周四 上午6:50写道: > Hello all! > > I'm having a problem with TableSources' DataStream being duplicated when > pulled on from 2 sinks. > > I understand that sometimes the best plan might just be to duplicate and > read both times a TableSource/SourceFunction; but in my case I can't quite > reproduce the data as say Kafka would. I just need the SourceFunction and > DataStream provided by the TableSource to not be duplicated. > > As a workaround to this issue, I introduce some sort of materialization > barrier that makes the planner pull only on one instance of the > TableSource/SourceFunction: > Instead of: > > tEnv.registerTableSource("foo_table", new FooTableSource()); > > I convert it to an Append Stream, and back again to a Table: > > tEnv.registerTableSource("foo_table_source", new FooTableSource()); > Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source"); > Table appendingSourceTable = tEnv.fromDataStream( > tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, > new TypeInformation[]{Types.LONG()})) > ); > tEnv.registerTable("foo_table", appendingSourceTable); > > And the conversion to an Append Stream somewhat makes the planner behave > and there is only one DataSource in the execution plan. > > But I'm feeling like I'm just missing a simple option (on the > SourceFunction, or on the TableSource?) to invoke and declare the Source as > being non duplicateable. > > I have tried a lot of options (uid(), operation chaining restrictions, > twiddling the transformation, forceNonParallel(), etc.), but can't find > quite how to do that! My SourceFunction is a RichSourceFunction > > At this point I'm wondering if this is a bug, or if it is a feature that > would have to be implemented. > > Cheers, > Ben > > > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn