[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode
[ https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzh updated FLINK-25471: Description: I have a dataStream with 6 lines datas like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" then sql transform : select upper(word) from tmp_table then change to dataStream then keyby sum. in batch mode: I think correct result is: > (BOB,1) > (ALICE,2) > (LILY,3) but the result is : > (BOB,1) if i set different parallelish ,the result is different. the source file and pom is in attach was: I have a dataStream with 6 lines datas like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" then sql transform : select upper(word) from tmp_table then change to dataStream then keyby sum. in batch mode: I think correct result is: > (BOB,1) > (ALICE,2) > (LILY,3) but the result is : > (BOB,1) if i set different parallelish ,the result is different. > wrong result if table toDataStream then keyey by sum in Batch Mode > -- > > Key: FLINK-25471 > URL: https://issues.apache.org/jira/browse/FLINK-25471 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.2 >Reporter: zhangzh >Priority: Critical > Attachments: TableToDataStreamBatchWordCount.scala, pom.xml > > > I have a dataStream with 6 lines datas like this: > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > then make it to table with one colums "wrod" > then sql transform : select upper(word) from tmp_table > then change to dataStream > then keyby sum. > > in batch mode: > I think correct result is: > > (BOB,1) > > (ALICE,2) > > (LILY,3) > > but the result is : > > (BOB,1) > if i set different parallelish ,the result is different. > > the source file and pom is in attach > > > > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode
[ https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzh updated FLINK-25471: Attachment: TableToDataStreamBatchWordCount.scala > wrong result if table toDataStream then keyey by sum in Batch Mode > -- > > Key: FLINK-25471 > URL: https://issues.apache.org/jira/browse/FLINK-25471 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.2 >Reporter: zhangzh >Priority: Critical > Attachments: TableToDataStreamBatchWordCount.scala, pom.xml > > > I have a dataStream with 6 lines datas like this: > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > then make it to table with one colums "wrod" > then sql transform : select upper(word) from tmp_table > then change to dataStream > then keyby sum. > > in batch mode: > I think correct result is: > > (BOB,1) > > (ALICE,2) > > (LILY,3) > > but the result is : > > (BOB,1) > if i set different parallelish ,the result is different. > > > > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode
[ https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzh updated FLINK-25471: Description: I have a dataStream with 6 lines datas like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" then sql transform : select upper(word) from tmp_table then change to dataStream then keyby sum. in batch mode: I think correct result is: > (BOB,1) > (ALICE,2) > (LILY,3) but the result is : > (BOB,1) if i set different parallelish ,the result is different. was: I have 6 lines like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" then sql transform : select upper(word) from tmp_table > wrong result if table toDataStream then keyey by sum in Batch Mode > -- > > Key: FLINK-25471 > URL: https://issues.apache.org/jira/browse/FLINK-25471 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.2 >Reporter: zhangzh >Priority: Critical > Attachments: pom.xml > > > I have a dataStream with 6 lines datas like this: > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > then make it to table with one colums "wrod" > then sql transform : select upper(word) from tmp_table > then change to dataStream > then keyby sum. > > in batch mode: > I think correct result is: > > (BOB,1) > > (ALICE,2) > > (LILY,3) > > but the result is : > > (BOB,1) > if i set different parallelish ,the result is different. > > > > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode
[ https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzh updated FLINK-25471: Attachment: pom.xml > wrong result if table toDataStream then keyey by sum in Batch Mode > -- > > Key: FLINK-25471 > URL: https://issues.apache.org/jira/browse/FLINK-25471 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.2 >Reporter: zhangzh >Priority: Critical > Attachments: pom.xml > > > I have a dataStream with 6 lines datas like this: > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > then make it to table with one colums "wrod" > then sql transform : select upper(word) from tmp_table > then change to dataStream > then keyby sum. > > in batch mode: > I think correct result is: > > (BOB,1) > > (ALICE,2) > > (LILY,3) > > but the result is : > > (BOB,1) > if i set different parallelish ,the result is different. > > > > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode
[ https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzh updated FLINK-25471: Description: I have 6 lines like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" then sql transform : select upper(word) from tmp_table was: I have 6 lines like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" > wrong result if table toDataStream then keyey by sum in Batch Mode > -- > > Key: FLINK-25471 > URL: https://issues.apache.org/jira/browse/FLINK-25471 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.2 >Reporter: zhangzh >Priority: Critical > > I have 6 lines like this: > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > then make it to table with one colums "wrod" > then sql transform : select upper(word) from tmp_table > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode
[ https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzh updated FLINK-25471: Description: I have 6 lines like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" was: I have 6 lines like this: Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") then make it to table with one colums "wrod" import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction} import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row object TableToDataStreamBatchWordCount { def main(args: Array[String]) { //create env and tableEnv val env = StreamExecutionEnvironment.getExecutionEnvironment env.setRuntimeMode(RuntimeExecutionMode.STREAMING) //env.setRuntimeMode(RuntimeExecutionMode.BATCH) env.setParallelism(7) val tableEnv = StreamTableEnvironment.create(env) // make data ,3 line val resultDS2 = env.fromElements( Row.of("Alice"), Row.of("alice"), Row.of("Bob"), Row.of("lily"), Row.of("lily"), Row.of("lily") )(Types.ROW(Types.STRING)) // dataStream[Row] --> Table --> sql to upper transform table val table = tableEnv.fromDataStream(resultDS2).as("word") tableEnv.createTemporaryView(s"tmp_table",table) val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from tmp_table ") // sql transformed table --> DataStream[String] val resultDs = tableEnv.toDataStream(resultTable).map(row => { row.getField("word").asInstanceOf[String] }) // keyby reduce val counts: DataStream[(String, Int)] = resultDs .map((_, 1)) .keyBy(_._1) .sum(1) // print result counts.print() env.execute("WordCount") } } > wrong result if table toDataStream then keyey by sum in Batch Mode > -- > > Key: FLINK-25471 > URL: https://issues.apache.org/jira/browse/FLINK-25471 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.2 >Reporter: zhangzh >Priority: Critical > > I have 6 lines like this: > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > then make it to table with one colums "wrod" > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode
[ https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzh updated FLINK-25471: Summary: wrong result if table toDataStream then keyey by sum in Batch Mode (was: wrong result if table toDataStream then keyey by sum) > wrong result if table toDataStream then keyey by sum in Batch Mode > -- > > Key: FLINK-25471 > URL: https://issues.apache.org/jira/browse/FLINK-25471 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.14.2 >Reporter: zhangzh >Priority: Critical > > I have 6 lines like this: > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > then make it to table with one colums "wrod" > > > > > > > > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction} > import org.apache.flink.api.scala.typeutils.Types > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment > import org.apache.flink.types.Row > object TableToDataStreamBatchWordCount { > def main(args: Array[String]) { > //create env and tableEnv > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setRuntimeMode(RuntimeExecutionMode.STREAMING) > //env.setRuntimeMode(RuntimeExecutionMode.BATCH) > env.setParallelism(7) > val tableEnv = StreamTableEnvironment.create(env) > // make data ,3 line > val resultDS2 = env.fromElements( > Row.of("Alice"), > Row.of("alice"), > Row.of("Bob"), > Row.of("lily"), > Row.of("lily"), > Row.of("lily") > )(Types.ROW(Types.STRING)) > // dataStream[Row] --> Table --> sql to upper transform table > val table = tableEnv.fromDataStream(resultDS2).as("word") > tableEnv.createTemporaryView(s"tmp_table",table) > val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from > tmp_table ") > // sql transformed table --> DataStream[String] > val resultDs = tableEnv.toDataStream(resultTable).map(row => { > row.getField("word").asInstanceOf[String] > }) > // keyby reduce > val counts: DataStream[(String, Int)] = resultDs > .map((_, 1)) > .keyBy(_._1) > .sum(1) > // print result > counts.print() > env.execute("WordCount") > } > } -- This message was sent by Atlassian Jira (v8.20.1#820001)