[ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he reassigned FLINK-5858: --------------------------------- Assignee: godfrey he > Support multiple sinks in same execution DAG > -------------------------------------------- > > Key: FLINK-5858 > URL: https://issues.apache.org/jira/browse/FLINK-5858 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: godfrey he > Assignee: godfrey he > > When call writeToSink method to write the Table(with TableSource) to a > TableSink, the Table was translated to DataSet or DataStream, if we call > writeToSink(write to different sinks) more than once, the Table was also > translated more than once. The final execution graph was parted to different > DAGs. For example: > {code:title=Example.scala|borderStyle=solid} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val csvTableSource = new CsvTableSource( > "/tmp/words", > Array("first", "id", "score", "last"), > Array( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO > ), > fieldDelim = "#" > ) > tEnv.registerTableSource("csv_source", csvTableSource) > val resultTable = tEnv.scan("csv_source") > .groupBy('first) > .select('first, 'score.sum) > resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) > resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) > println(tEnv.explain(resultTable)) > {code} > Results: > == Abstract Syntax Tree == > LogicalProject(first=[$0], TMP_1=[$1]) > LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) > LogicalProject(first=[$0], score=[$2]) > LogicalTableScan(table=[[csv_source]]) > == Optimized Logical Plan == > DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) > BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) > == Physical Execution Plan == > {color:red} > Stage 6 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 5 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 4 : GroupCombine > content : groupBy: (first), select: (first, SUM(score) > AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Sorted Combine > Partitioning : RANDOM_PARTITIONED > Stage 3 : GroupReduce > content : groupBy: (first), select: (first, > SUM(score) AS TMP_0) > ship_strategy : Hash Partition on [0] > exchange_mode : PIPELINED > driver_strategy : Sorted Group Reduce > Partitioning : RANDOM_PARTITIONED > Stage 2 : Map > content : to: Row(f0: String, f1: > Double) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 1 : Map > content : Map at > emitDataSet(CsvTableSink.scala:67) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : > RANDOM_PARTITIONED > Stage 0 : Data Sink > content : > TextOutputFormat (/tmp/wordcount1) - UTF-8 > ship_strategy : Forward > exchange_mode : > PIPELINED > Partitioning : > RANDOM_PARTITIONED > {color:red} > Stage 13 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 12 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 11 : GroupCombine > content : groupBy: (first), select: (first, SUM(score) > AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Sorted Combine > Partitioning : RANDOM_PARTITIONED > Stage 10 : GroupReduce > content : groupBy: (first), select: (first, > SUM(score) AS TMP_0) > ship_strategy : Hash Partition on [0] > exchange_mode : PIPELINED > driver_strategy : Sorted Group Reduce > Partitioning : RANDOM_PARTITIONED > Stage 9 : Map > content : to: Row(f0: String, f1: > Double) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 8 : Map > content : Map at > emitDataSet(CsvTableSink.scala:67) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : > RANDOM_PARTITIONED > Stage 7 : Data Sink > content : > TextOutputFormat (/tmp/wordcount2) - UTF-8 > ship_strategy : Forward > exchange_mode : > PIPELINED > Partitioning : > RANDOM_PARTITIONED > {color:red} > Stage 18 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 17 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 16 : GroupCombine > content : groupBy: (first), select: (first, SUM(score) > AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Sorted Combine > Partitioning : RANDOM_PARTITIONED > Stage 15 : GroupReduce > content : groupBy: (first), select: (first, > SUM(score) AS TMP_0) > ship_strategy : Hash Partition on [0] > exchange_mode : PIPELINED > driver_strategy : Sorted Group Reduce > Partitioning : RANDOM_PARTITIONED > Stage 14 : Data Sink > content : > org.apache.flink.api.java.io.DiscardingOutputFormat > ship_strategy : Forward > exchange_mode : PIPELINED > Partitioning : RANDOM_PARTITIONED -- This message was sent by Atlassian JIRA (v6.4.14#64029)