[ 
https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-5858:
---------------------------------

    Assignee: godfrey he

> Support multiple sinks in same execution DAG
> --------------------------------------------
>
>                 Key: FLINK-5858
>                 URL: https://issues.apache.org/jira/browse/FLINK-5858
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: godfrey he
>            Assignee: godfrey he
>
> When call writeToSink method to write the Table(with TableSource) to a 
> TableSink, the Table was translated to DataSet or DataStream, if we call 
> writeToSink(write to different sinks) more than once, the Table was also 
> translated more than once. The final execution graph was parted to different 
> DAGs. For example:
> {code:title=Example.scala|borderStyle=solid}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val csvTableSource = new CsvTableSource(
>       "/tmp/words",
>       Array("first", "id", "score", "last"),
>       Array(
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.INT_TYPE_INFO,
>         BasicTypeInfo.DOUBLE_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO
>       ),
>       fieldDelim = "#"
>     )
>     tEnv.registerTableSource("csv_source", csvTableSource)
>     val resultTable = tEnv.scan("csv_source")
>       .groupBy('first)
>       .select('first, 'score.sum)
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
>     println(tEnv.explain(resultTable))
> {code}
> Results:
> == Abstract Syntax Tree ==
> LogicalProject(first=[$0], TMP_1=[$1])
>   LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
>     LogicalProject(first=[$0], score=[$2])
>       LogicalTableScan(table=[[csv_source]])
> == Optimized Logical Plan ==
> DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
>   BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
> == Physical Execution Plan ==
> {color:red}
> Stage 6 : Data Source
> {color}
>       content : collect elements with CollectionInputFormat
>       Partitioning : RANDOM_PARTITIONED
>       Stage 5 : Map
>               content : prepare select: (first, SUM(score) AS TMP_0)
>               ship_strategy : Forward
>               exchange_mode : PIPELINED
>               driver_strategy : Map
>               Partitioning : RANDOM_PARTITIONED
>               Stage 4 : GroupCombine
>                       content : groupBy: (first), select: (first, SUM(score) 
> AS TMP_0)
>                       ship_strategy : Forward
>                       exchange_mode : PIPELINED
>                       driver_strategy : Sorted Combine
>                       Partitioning : RANDOM_PARTITIONED
>                       Stage 3 : GroupReduce
>                               content : groupBy: (first), select: (first, 
> SUM(score) AS TMP_0)
>                               ship_strategy : Hash Partition on [0]
>                               exchange_mode : PIPELINED
>                               driver_strategy : Sorted Group Reduce
>                               Partitioning : RANDOM_PARTITIONED
>                               Stage 2 : Map
>                                       content : to: Row(f0: String, f1: 
> Double)
>                                       ship_strategy : Forward
>                                       exchange_mode : PIPELINED
>                                       driver_strategy : Map
>                                       Partitioning : RANDOM_PARTITIONED
>                                       Stage 1 : Map
>                                               content : Map at 
> emitDataSet(CsvTableSink.scala:67)
>                                               ship_strategy : Forward
>                                               exchange_mode : PIPELINED
>                                               driver_strategy : Map
>                                               Partitioning : 
> RANDOM_PARTITIONED
>                                               Stage 0 : Data Sink
>                                                       content : 
> TextOutputFormat (/tmp/wordcount1) - UTF-8
>                                                       ship_strategy : Forward
>                                                       exchange_mode : 
> PIPELINED
>                                                       Partitioning : 
> RANDOM_PARTITIONED
> {color:red}
> Stage 13 : Data Source
> {color}
>       content : collect elements with CollectionInputFormat
>       Partitioning : RANDOM_PARTITIONED
>       Stage 12 : Map
>               content : prepare select: (first, SUM(score) AS TMP_0)
>               ship_strategy : Forward
>               exchange_mode : PIPELINED
>               driver_strategy : Map
>               Partitioning : RANDOM_PARTITIONED
>               Stage 11 : GroupCombine
>                       content : groupBy: (first), select: (first, SUM(score) 
> AS TMP_0)
>                       ship_strategy : Forward
>                       exchange_mode : PIPELINED
>                       driver_strategy : Sorted Combine
>                       Partitioning : RANDOM_PARTITIONED
>                       Stage 10 : GroupReduce
>                               content : groupBy: (first), select: (first, 
> SUM(score) AS TMP_0)
>                               ship_strategy : Hash Partition on [0]
>                               exchange_mode : PIPELINED
>                               driver_strategy : Sorted Group Reduce
>                               Partitioning : RANDOM_PARTITIONED
>                               Stage 9 : Map
>                                       content : to: Row(f0: String, f1: 
> Double)
>                                       ship_strategy : Forward
>                                       exchange_mode : PIPELINED
>                                       driver_strategy : Map
>                                       Partitioning : RANDOM_PARTITIONED
>                                       Stage 8 : Map
>                                               content : Map at 
> emitDataSet(CsvTableSink.scala:67)
>                                               ship_strategy : Forward
>                                               exchange_mode : PIPELINED
>                                               driver_strategy : Map
>                                               Partitioning : 
> RANDOM_PARTITIONED
>                                               Stage 7 : Data Sink
>                                                       content : 
> TextOutputFormat (/tmp/wordcount2) - UTF-8
>                                                       ship_strategy : Forward
>                                                       exchange_mode : 
> PIPELINED
>                                                       Partitioning : 
> RANDOM_PARTITIONED
> {color:red}
> Stage 18 : Data Source
> {color}
>       content : collect elements with CollectionInputFormat
>       Partitioning : RANDOM_PARTITIONED
>       Stage 17 : Map
>               content : prepare select: (first, SUM(score) AS TMP_0)
>               ship_strategy : Forward
>               exchange_mode : PIPELINED
>               driver_strategy : Map
>               Partitioning : RANDOM_PARTITIONED
>               Stage 16 : GroupCombine
>                       content : groupBy: (first), select: (first, SUM(score) 
> AS TMP_0)
>                       ship_strategy : Forward
>                       exchange_mode : PIPELINED
>                       driver_strategy : Sorted Combine
>                       Partitioning : RANDOM_PARTITIONED
>                       Stage 15 : GroupReduce
>                               content : groupBy: (first), select: (first, 
> SUM(score) AS TMP_0)
>                               ship_strategy : Hash Partition on [0]
>                               exchange_mode : PIPELINED
>                               driver_strategy : Sorted Group Reduce
>                               Partitioning : RANDOM_PARTITIONED
>                               Stage 14 : Data Sink
>                                       content : 
> org.apache.flink.api.java.io.DiscardingOutputFormat
>                                       ship_strategy : Forward
>                                       exchange_mode : PIPELINED
>                                       Partitioning : RANDOM_PARTITIONED



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to