[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode

2021-12-28 Thread zhangzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:

Description: 
I have a dataStream with 6 lines datas like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

then sql transform : select upper(word) from tmp_table

then change to dataStream

then keyby sum.

 

in batch mode:

I think correct result is:

> (BOB,1)
> (ALICE,2)
> (LILY,3)

 

but the result is :

> (BOB,1)

if i set different parallelish ,the result is different.

 

the source file  and pom is in attach

 

 

 

 

 

 

 

  was:
I have a dataStream with 6 lines datas like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

then sql transform : select upper(word) from tmp_table

then change to dataStream

then keyby sum.

 

in batch mode:

I think correct result is:

> (BOB,1)
> (ALICE,2)
> (LILY,3)

 

but the result is :

> (BOB,1)

if i set different parallelish ,the result is different.

 

 

 

 

 

 

 


> wrong result if table toDataStream then keyey by sum in Batch Mode
> --
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: zhangzh
>Priority: Critical
> Attachments: TableToDataStreamBatchWordCount.scala, pom.xml
>
>
> I have a dataStream with 6 lines datas like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums  "wrod"
> then sql transform : select upper(word) from tmp_table
> then change to dataStream
> then keyby sum.
>  
> in batch mode:
> I think correct result is:
> > (BOB,1)
> > (ALICE,2)
> > (LILY,3)
>  
> but the result is :
> > (BOB,1)
> if i set different parallelish ,the result is different.
>  
> the source file  and pom is in attach
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode

2021-12-28 Thread zhangzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:

Attachment: TableToDataStreamBatchWordCount.scala

> wrong result if table toDataStream then keyey by sum in Batch Mode
> --
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: zhangzh
>Priority: Critical
> Attachments: TableToDataStreamBatchWordCount.scala, pom.xml
>
>
> I have a dataStream with 6 lines datas like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums  "wrod"
> then sql transform : select upper(word) from tmp_table
> then change to dataStream
> then keyby sum.
>  
> in batch mode:
> I think correct result is:
> > (BOB,1)
> > (ALICE,2)
> > (LILY,3)
>  
> but the result is :
> > (BOB,1)
> if i set different parallelish ,the result is different.
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode

2021-12-28 Thread zhangzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:

Description: 
I have a dataStream with 6 lines datas like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

then sql transform : select upper(word) from tmp_table

then change to dataStream

then keyby sum.

 

in batch mode:

I think correct result is:

> (BOB,1)
> (ALICE,2)
> (LILY,3)

 

but the result is :

> (BOB,1)

if i set different parallelish ,the result is different.

 

 

 

 

 

 

 

  was:
I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

then sql transform : select upper(word) from tmp_table

 

 

 


> wrong result if table toDataStream then keyey by sum in Batch Mode
> --
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: zhangzh
>Priority: Critical
> Attachments: pom.xml
>
>
> I have a dataStream with 6 lines datas like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums  "wrod"
> then sql transform : select upper(word) from tmp_table
> then change to dataStream
> then keyby sum.
>  
> in batch mode:
> I think correct result is:
> > (BOB,1)
> > (ALICE,2)
> > (LILY,3)
>  
> but the result is :
> > (BOB,1)
> if i set different parallelish ,the result is different.
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode

2021-12-28 Thread zhangzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:

Attachment: pom.xml

> wrong result if table toDataStream then keyey by sum in Batch Mode
> --
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: zhangzh
>Priority: Critical
> Attachments: pom.xml
>
>
> I have a dataStream with 6 lines datas like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums  "wrod"
> then sql transform : select upper(word) from tmp_table
> then change to dataStream
> then keyby sum.
>  
> in batch mode:
> I think correct result is:
> > (BOB,1)
> > (ALICE,2)
> > (LILY,3)
>  
> but the result is :
> > (BOB,1)
> if i set different parallelish ,the result is different.
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode

2021-12-28 Thread zhangzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:

Description: 
I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

then sql transform : select upper(word) from tmp_table

 

 

 

  was:
I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

 

 

 


> wrong result if table toDataStream then keyey by sum in Batch Mode
> --
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: zhangzh
>Priority: Critical
>
> I have 6 lines like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums  "wrod"
> then sql transform : select upper(word) from tmp_table
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode

2021-12-28 Thread zhangzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:

Description: 
I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums  "wrod"

 

 

 

  was:
I have 6 lines like this:

Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")

then  make it to  table  with one colums "wrod"

 

 

 

 

 

 

 




import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

object TableToDataStreamBatchWordCount {

def main(args: Array[String]) {


//create env and tableEnv
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(7)
val tableEnv = StreamTableEnvironment.create(env)

// make data ,3 line
val resultDS2 = env.fromElements(
Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")
)(Types.ROW(Types.STRING))

// dataStream[Row] --> Table --> sql to upper transform table
val table = tableEnv.fromDataStream(resultDS2).as("word")
tableEnv.createTemporaryView(s"tmp_table",table)
val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from 
tmp_table ")

// sql transformed table --> DataStream[String]
val resultDs = tableEnv.toDataStream(resultTable).map(row => {
row.getField("word").asInstanceOf[String]
})


// keyby reduce
val counts: DataStream[(String, Int)] = resultDs
.map((_, 1))
.keyBy(_._1)
.sum(1)

// print result
counts.print()

env.execute("WordCount")
}
}


> wrong result if table toDataStream then keyey by sum in Batch Mode
> --
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: zhangzh
>Priority: Critical
>
> I have 6 lines like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums  "wrod"
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25471) wrong result if table toDataStream then keyey by sum in Batch Mode

2021-12-28 Thread zhangzh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangzh updated FLINK-25471:

Summary: wrong result if table toDataStream then keyey by sum in Batch Mode 
 (was: wrong result if table toDataStream then keyey by sum)

> wrong result if table toDataStream then keyey by sum in Batch Mode
> --
>
> Key: FLINK-25471
> URL: https://issues.apache.org/jira/browse/FLINK-25471
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.2
>Reporter: zhangzh
>Priority: Critical
>
> I have 6 lines like this:
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> then  make it to  table  with one colums "wrod"
>  
>  
>  
>  
>  
>  
>  
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.functions.\{MapFunction, ReduceFunction}
> import org.apache.flink.api.scala.typeutils.Types
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.types.Row
> object TableToDataStreamBatchWordCount {
> def main(args: Array[String]) {
> //create env and tableEnv
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
> //env.setRuntimeMode(RuntimeExecutionMode.BATCH)
> env.setParallelism(7)
> val tableEnv = StreamTableEnvironment.create(env)
> // make data ,3 line
> val resultDS2 = env.fromElements(
> Row.of("Alice"),
> Row.of("alice"),
> Row.of("Bob"),
> Row.of("lily"),
> Row.of("lily"),
> Row.of("lily")
> )(Types.ROW(Types.STRING))
> // dataStream[Row] --> Table --> sql to upper transform table
> val table = tableEnv.fromDataStream(resultDS2).as("word")
> tableEnv.createTemporaryView(s"tmp_table",table)
> val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from 
> tmp_table ")
> // sql transformed table --> DataStream[String]
> val resultDs = tableEnv.toDataStream(resultTable).map(row => {
> row.getField("word").asInstanceOf[String]
> })
> // keyby reduce
> val counts: DataStream[(String, Int)] = resultDs
> .map((_, 1))
> .keyBy(_._1)
> .sum(1)
> // print result
> counts.print()
> env.execute("WordCount")
> }
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)