[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

2019-09-04 Thread Stefano (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefano updated FLINK-13944:

Environment: 
{code:bash}
$ java -version
openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode
{code}

{{--}}

{code:bash}
$ scala -version
Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL
{code}

{{--}}

{{build.}}{{sbt}}

[...]

ThisBuild / scalaVersion := "2.11.12"

val flinkVersion = "1.9.0"

val flinkDependencies = Seq(
 "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")

[...]

 

  was:
{{$ java -version}}
{{ openjdk version "1.8.0_222"}}
{{ OpenJDK Runtime Environment (build 
1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}}
{{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}}

{{--}}

{{$ scala -version}}
{{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}}

{{--}}

{{build.}}{{sbt}}

[...]

ThisBuild / scalaVersion := "2.11.12"

val flinkVersion = "1.9.0"

val flinkDependencies = Seq(
 "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")

[...]

 


> Table.toAppendStream: InvalidProgramException: Table program cannot be 
> compiled.
> 
>
> Key: FLINK-13944
> URL: https://issues.apache.org/jira/browse/FLINK-13944
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala, Table SQL / API
>Affects Versions: 1.8.1, 1.9.0
> Environment: {code:bash}
> $ java -version
> openjdk version "1.8.0_222"
> OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10
> OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode
> {code}
> {{--}}
> {code:bash}
> $ scala -version
> Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL
> {code}
> {{--}}
> {{build.}}{{sbt}}
> [...]
> ThisBuild / scalaVersion := "2.11.12"
> val flinkVersion = "1.9.0"
> val flinkDependencies = Seq(
>  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
>  "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")
> [...]
>  
>Reporter: Stefano
>Priority: Major
> Attachments: app.zip
>
>
> (The project in which I face the error is attached.)
> {{Using: Scala streaming API and the StreamTableEnvironment.}}
> {{Given the classes:}}
> {code:scala}
> object EntityType extends Enumeration {
>   type EntityType = Value
>   val ACTIVITY = Value
>  }
> sealed trait Entity extends Serializable
> case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
> station_id: Long, station_name: String, activity_code: Long, amount: Long) 
> extends Entity
> {code}
> What I try to do to convert a table after selection to an appendStream:
> {code:scala}
> /** activity table **/
> val activityDataStream = partialComputation1
>   .filter(_._1 == EntityType.ACTIVITY)
>   .map(x => x._3.asInstanceOf[Activity])
> tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
> 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)
> val selectedTable = tableEnv.scan("activity").select("card_id, second")
> selectedTable.printSchema()
> // root
> //   |-- card_id: BIGINT
> //   |-- second: BIGINT
> // ATTEMPT 1
> //val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)
> //output.print
> // ATTEMPT 2
> //val output = tableEnv.toAppendStream[(java.lang.Long, 
> java.lang.Long)](selectedTable)
> //output.print
> // ATTEMPT 3
> //val output = tableEnv.toAppendStream[Row](selectedTable)
> //output.print
> // ATTEMPT 4
> case class Test(card_id: Long, second: Long) extends Entity
> val output = tableEnv.toAppendStream[Test](selectedTable)
> output.print
> {code}
> In any of the attempts the error I get is always the same:
> {code:bash}
> $ flink run target/scala-2.11/app-assembly-0.1.jar 
> Starting execution of program
> root
>  |-- card_id: BIGINT
>  |-- second: BIGINT
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: 9954823e0b55a8140f78be6868c85399)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>   at 

[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

2019-09-03 Thread Stefano (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefano updated FLINK-13944:

Description: 
(The project in which I face the error is attached.)

{{Using: Scala streaming API and the StreamTableEnvironment.}}

{{Given the classes:}}
{code:scala}
object EntityType extends Enumeration {
  type EntityType = Value
  val ACTIVITY = Value
 }
sealed trait Entity extends Serializable

case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
station_id: Long, station_name: String, activity_code: Long, amount: Long) 
extends Entity
{code}
What I try to do to convert a table after selection to an appendStream:
{code:scala}
/** activity table **/
val activityDataStream = partialComputation1
  .filter(_._1 == EntityType.ACTIVITY)
  .map(x => x._3.asInstanceOf[Activity])
tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)


val selectedTable = tableEnv.scan("activity").select("card_id, second")
selectedTable.printSchema()
// root
//   |-- card_id: BIGINT
//   |-- second: BIGINT

// ATTEMPT 1
//val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)
//output.print

// ATTEMPT 2
//val output = tableEnv.toAppendStream[(java.lang.Long, 
java.lang.Long)](selectedTable)
//output.print

// ATTEMPT 3
//val output = tableEnv.toAppendStream[Row](selectedTable)
//output.print

// ATTEMPT 4
case class Test(card_id: Long, second: Long) extends Entity
val output = tableEnv.toAppendStream[Test](selectedTable)
output.print
{code}
In any of the attempts the error I get is always the same:
{code:bash}
$ flink run target/scala-2.11/app-assembly-0.1.jar 

Starting execution of program
root
 |-- card_id: BIGINT
 |-- second: BIGINT



 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
9954823e0b55a8140f78be6868c85399)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at bds_comparison.flink.metrocard.App$.main(App.scala:141)
at bds_comparison.flink.metrocard.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 23 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOu

[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

2019-09-03 Thread Stefano (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefano updated FLINK-13944:

Description: 
{{Using: Scala streaming API and the StreamTableEnvironment.}}

{{Given the classes:}}

{code:scala}
object EntityType extends Enumeration {
  type EntityType = Value
  val ACTIVITY = Value
 }
sealed trait Entity extends Serializable

case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
station_id: Long, station_name: String, activity_code: Long, amount: Long) 
extends Entity
{code}

 

What I try to do is\{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
 {{val activityDataStream = partialComputation1}}
 \{{ .filter(_._1 == EntityType.ACTIVITY)}}
 {{ .map(x => x._3.asInstanceOf[Activity])}}
 {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}

{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
 {{selectedTable.printSchema()}}
 {{// root}}
 {{// |-- card_id: BIGINT}}
 {{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
 {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 2}}
 {{// val output = tableEnv.toAppendStream[(java.lang.Long, 
java.lang.Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 3}}
 {{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 4}}
 {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = 
tableEnv.toAppendStream[Test](selectedTable)}}
 {{output.print}}

 

The result for each of the attempts is always the same:

 

{{--- The program finished with the 
following exception:}}
 {{org.apache.flink.client.program.ProgramInvocationException: Job failed. 
(JobID: 334fe364c516008ca34b76e27c5c6f79) at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at

... 23 more

Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program 
cannot be compiled. This is a bug. Please file an issue.* at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
 at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)}}

 

My project in which I face the error is attached.

  was:
{{Using: Scala streaming API and the StreamTableEnvironment.}}

{{Given the classes:}}

object EntityType extends Enumeration {
  type EntityType = Value
  val ACTIVITY = Value
 }
sealed trait Entity extends Serializable

{{case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
station_id: Long, station_name: String, activity_code: Long, amount: Long) 
extends Entity}}

 

What I try to do is\{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
 {{val activityDataStream = partialComputation1}}
 \{{ .filter(_._1 == EntityType.ACTIVITY)}}
 {{ .map(x => x._3.asInstanceOf[Activity])}}
 {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}

{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
 {{selectedTable.printSchema()}}
 {{// root}}
 {{// |-- card_id: BIGINT}}
 {{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
 {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 2}}
 {{// val output = tableEnv.toAppendStream[(java.lang.Long, 
java.lang.Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 3}}
 {{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 4}}
 {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = 
tableEnv.toAppendStream[Test](selectedTable)}}
 {{output.print}}

 

The result for each of the attempts is always the same:

 

{{--- The program finished with the 
following exception:}}
 {{org.apache.flink.client.progra

[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

2019-09-03 Thread Stefano (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefano updated FLINK-13944:

Description: 
{{Using: Scala streaming API and the StreamTableEnvironment.}}

{{Given the classes:}}

object EntityType extends Enumeration {
  type EntityType = Value
  val ACTIVITY = Value
 }
sealed trait Entity extends Serializable

{{case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
station_id: Long, station_name: String, activity_code: Long, amount: Long) 
extends Entity}}

 

What I try to do is\{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
 {{val activityDataStream = partialComputation1}}
 \{{ .filter(_._1 == EntityType.ACTIVITY)}}
 {{ .map(x => x._3.asInstanceOf[Activity])}}
 {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}

{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
 {{selectedTable.printSchema()}}
 {{// root}}
 {{// |-- card_id: BIGINT}}
 {{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
 {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 2}}
 {{// val output = tableEnv.toAppendStream[(java.lang.Long, 
java.lang.Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 3}}
 {{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 4}}
 {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = 
tableEnv.toAppendStream[Test](selectedTable)}}
 {{output.print}}

 

The result for each of the attempts is always the same:

 

{{--- The program finished with the 
following exception:}}
 {{org.apache.flink.client.program.ProgramInvocationException: Job failed. 
(JobID: 334fe364c516008ca34b76e27c5c6f79) at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at

... 23 more

Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program 
cannot be compiled. This is a bug. Please file an issue.* at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
 at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)}}

 

My project in which I face the error is attached.

  was:
{{Using: Scala streaming API and the StreamTableEnvironment.}}

Given the classes:

{{object EntityType extends Enumeration {}}
{{ type EntityType = Value}}
{{ val ACTIVITY = Value}}
{{}}}

{{sealed trait Entity extends Serializable}}

{{case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
station_id: Long, station_name: String, activity_code: Long, amount: Long) 
extends Entity}}

 

What I try to do is{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
{{val activityDataStream = partialComputation1}}
{{ .filter(_._1 == EntityType.ACTIVITY)}}
{{ .map(x => x._3.asInstanceOf[Activity])}}
{{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}


{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
{{selectedTable.printSchema()}}
{{// root}}
{{// |-- card_id: BIGINT}}
{{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
{{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 2}}
{{// val output = tableEnv.toAppendStream[(java.lang.Long, 
java.lang.Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 3}}
{{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 4}}
{{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = 
tableEnv.toAppendStream[Test](selectedTable)}}
{{output.print}}

 

The result for each of the attempts is always the same:

 

{{--- The program finished with the 
following exception:}}
 {{org.apache.flink.client.program.ProgramInvocation

[jira] [Created] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

2019-09-03 Thread Stefano (Jira)
Stefano created FLINK-13944:
---

 Summary: Table.toAppendStream: InvalidProgramException: Table 
program cannot be compiled.
 Key: FLINK-13944
 URL: https://issues.apache.org/jira/browse/FLINK-13944
 Project: Flink
  Issue Type: Bug
  Components: API / Scala, Table SQL / API
Affects Versions: 1.9.0, 1.8.1
 Environment: {{$ java -version}}
{{ openjdk version "1.8.0_222"}}
{{ OpenJDK Runtime Environment (build 
1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}}
{{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}}

{{--}}

{{$ scala -version}}
{{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}}

{{--}}

{{build.}}{{sbt}}

[...]

ThisBuild / scalaVersion := "2.11.12"

val flinkVersion = "1.9.0"

val flinkDependencies = Seq(
 "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")

[...]

 
Reporter: Stefano
 Attachments: app.zip

{{Using: Scala streaming API and the StreamTableEnvironment.}}

Given the classes:

{{object EntityType extends Enumeration {}}
{{ type EntityType = Value}}
{{ val ACTIVITY = Value}}
{{}}}

{{sealed trait Entity extends Serializable}}

{{case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
station_id: Long, station_name: String, activity_code: Long, amount: Long) 
extends Entity}}

 

What I try to do is{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
{{val activityDataStream = partialComputation1}}
{{ .filter(_._1 == EntityType.ACTIVITY)}}
{{ .map(x => x._3.asInstanceOf[Activity])}}
{{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}


{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
{{selectedTable.printSchema()}}
{{// root}}
{{// |-- card_id: BIGINT}}
{{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
{{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 2}}
{{// val output = tableEnv.toAppendStream[(java.lang.Long, 
java.lang.Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 3}}
{{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 4}}
{{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = 
tableEnv.toAppendStream[Test](selectedTable)}}
{{output.print}}

 

The result for each of the attempts is always the same:

 

{{--- The program finished with the 
following exception:}}
 {{org.apache.flink.client.program.ProgramInvocationException: Job failed. 
(JobID: 334fe364c516008ca34b76e27c5c6f79) at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at

... 23 more

Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program 
cannot be compiled. This is a bug. Please file an issue.* at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
 at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)}}

 

My project in which I face the error is attached.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)