[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.
[ https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefano updated FLINK-13944: Environment: {code:bash} $ java -version openjdk version "1.8.0_222" OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode {code} {{--}} {code:bash} $ scala -version Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL {code} {{--}} {{build.}}{{sbt}} [...] ThisBuild / scalaVersion := "2.11.12" val flinkVersion = "1.9.0" val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided") [...] was: {{$ java -version}} {{ openjdk version "1.8.0_222"}} {{ OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}} {{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}} {{--}} {{$ scala -version}} {{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}} {{--}} {{build.}}{{sbt}} [...] ThisBuild / scalaVersion := "2.11.12" val flinkVersion = "1.9.0" val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided") [...] > Table.toAppendStream: InvalidProgramException: Table program cannot be > compiled. > > > Key: FLINK-13944 > URL: https://issues.apache.org/jira/browse/FLINK-13944 > Project: Flink > Issue Type: Bug > Components: API / Scala, Table SQL / API >Affects Versions: 1.8.1, 1.9.0 > Environment: {code:bash} > $ java -version > openjdk version "1.8.0_222" > OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10 > OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode > {code} > {{--}} > {code:bash} > $ scala -version > Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL > {code} > {{--}} > {{build.}}{{sbt}} > [...] > ThisBuild / scalaVersion := "2.11.12" > val flinkVersion = "1.9.0" > val flinkDependencies = Seq( > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided") > [...] > >Reporter: Stefano >Priority: Major > Attachments: app.zip > > > (The project in which I face the error is attached.) > {{Using: Scala streaming API and the StreamTableEnvironment.}} > {{Given the classes:}} > {code:scala} > object EntityType extends Enumeration { > type EntityType = Value > val ACTIVITY = Value > } > sealed trait Entity extends Serializable > case class Activity(card_id: Long, date_time: Timestamp, second: Long, > station_id: Long, station_name: String, activity_code: Long, amount: Long) > extends Entity > {code} > What I try to do to convert a table after selection to an appendStream: > {code:scala} > /** activity table **/ > val activityDataStream = partialComputation1 > .filter(_._1 == EntityType.ACTIVITY) > .map(x => x._3.asInstanceOf[Activity]) > tableEnv.registerDataStream("activity", activityDataStream, 'card_id, > 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount) > val selectedTable = tableEnv.scan("activity").select("card_id, second") > selectedTable.printSchema() > // root > // |-- card_id: BIGINT > // |-- second: BIGINT > // ATTEMPT 1 > //val output = tableEnv.toAppendStream[(Long, Long)](selectedTable) > //output.print > // ATTEMPT 2 > //val output = tableEnv.toAppendStream[(java.lang.Long, > java.lang.Long)](selectedTable) > //output.print > // ATTEMPT 3 > //val output = tableEnv.toAppendStream[Row](selectedTable) > //output.print > // ATTEMPT 4 > case class Test(card_id: Long, second: Long) extends Entity > val output = tableEnv.toAppendStream[Test](selectedTable) > output.print > {code} > In any of the attempts the error I get is always the same: > {code:bash} > $ flink run target/scala-2.11/app-assembly-0.1.jar > Starting execution of program > root > |-- card_id: BIGINT > |-- second: BIGINT > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 9954823e0b55a8140f78be6868c85399) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > at
[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.
[ https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefano updated FLINK-13944: Description: (The project in which I face the error is attached.) {{Using: Scala streaming API and the StreamTableEnvironment.}} {{Given the classes:}} {code:scala} object EntityType extends Enumeration { type EntityType = Value val ACTIVITY = Value } sealed trait Entity extends Serializable case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity {code} What I try to do to convert a table after selection to an appendStream: {code:scala} /** activity table **/ val activityDataStream = partialComputation1 .filter(_._1 == EntityType.ACTIVITY) .map(x => x._3.asInstanceOf[Activity]) tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount) val selectedTable = tableEnv.scan("activity").select("card_id, second") selectedTable.printSchema() // root // |-- card_id: BIGINT // |-- second: BIGINT // ATTEMPT 1 //val output = tableEnv.toAppendStream[(Long, Long)](selectedTable) //output.print // ATTEMPT 2 //val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable) //output.print // ATTEMPT 3 //val output = tableEnv.toAppendStream[Row](selectedTable) //output.print // ATTEMPT 4 case class Test(card_id: Long, second: Long) extends Entity val output = tableEnv.toAppendStream[Test](selectedTable) output.print {code} In any of the attempts the error I get is always the same: {code:bash} $ flink run target/scala-2.11/app-assembly-0.1.jar Starting execution of program root |-- card_id: BIGINT |-- second: BIGINT The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 9954823e0b55a8140f78be6868c85399) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at bds_comparison.flink.metrocard.App$.main(App.scala:141) at bds_comparison.flink.metrocard.App.main(App.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) ... 23 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOu
[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.
[ https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefano updated FLINK-13944: Description: {{Using: Scala streaming API and the StreamTableEnvironment.}} {{Given the classes:}} {code:scala} object EntityType extends Enumeration { type EntityType = Value val ACTIVITY = Value } sealed trait Entity extends Serializable case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity {code} What I try to do is\{{ to convert a table after selection to an appendStream.}} {{/** activity table **/}} {{val activityDataStream = partialComputation1}} \{{ .filter(_._1 == EntityType.ACTIVITY)}} {{ .map(x => x._3.asInstanceOf[Activity])}} {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}} {{val selectedTable = tableEnv.scan("activity").select("card_id, second")}} {{selectedTable.printSchema()}} {{// root}} {{// |-- card_id: BIGINT}} {{// |-- second: BIGINT}} {{// ATTEMPT 1}} {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 2}} {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 3}} {{// val output = tableEnv.toAppendStream[Row](selectedTable)}} {{// output.print}} {{// ATTEMPT 4}} {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}} {{output.print}} The result for each of the attempts is always the same: {{--- The program finished with the following exception:}} {{org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 334fe364c516008ca34b76e27c5c6f79) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at ... 23 more Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)}} My project in which I face the error is attached. was: {{Using: Scala streaming API and the StreamTableEnvironment.}} {{Given the classes:}} object EntityType extends Enumeration { type EntityType = Value val ACTIVITY = Value } sealed trait Entity extends Serializable {{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}} What I try to do is\{{ to convert a table after selection to an appendStream.}} {{/** activity table **/}} {{val activityDataStream = partialComputation1}} \{{ .filter(_._1 == EntityType.ACTIVITY)}} {{ .map(x => x._3.asInstanceOf[Activity])}} {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}} {{val selectedTable = tableEnv.scan("activity").select("card_id, second")}} {{selectedTable.printSchema()}} {{// root}} {{// |-- card_id: BIGINT}} {{// |-- second: BIGINT}} {{// ATTEMPT 1}} {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 2}} {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 3}} {{// val output = tableEnv.toAppendStream[Row](selectedTable)}} {{// output.print}} {{// ATTEMPT 4}} {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}} {{output.print}} The result for each of the attempts is always the same: {{--- The program finished with the following exception:}} {{org.apache.flink.client.progra
[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.
[ https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefano updated FLINK-13944: Description: {{Using: Scala streaming API and the StreamTableEnvironment.}} {{Given the classes:}} object EntityType extends Enumeration { type EntityType = Value val ACTIVITY = Value } sealed trait Entity extends Serializable {{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}} What I try to do is\{{ to convert a table after selection to an appendStream.}} {{/** activity table **/}} {{val activityDataStream = partialComputation1}} \{{ .filter(_._1 == EntityType.ACTIVITY)}} {{ .map(x => x._3.asInstanceOf[Activity])}} {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}} {{val selectedTable = tableEnv.scan("activity").select("card_id, second")}} {{selectedTable.printSchema()}} {{// root}} {{// |-- card_id: BIGINT}} {{// |-- second: BIGINT}} {{// ATTEMPT 1}} {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 2}} {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 3}} {{// val output = tableEnv.toAppendStream[Row](selectedTable)}} {{// output.print}} {{// ATTEMPT 4}} {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}} {{output.print}} The result for each of the attempts is always the same: {{--- The program finished with the following exception:}} {{org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 334fe364c516008ca34b76e27c5c6f79) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at ... 23 more Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)}} My project in which I face the error is attached. was: {{Using: Scala streaming API and the StreamTableEnvironment.}} Given the classes: {{object EntityType extends Enumeration {}} {{ type EntityType = Value}} {{ val ACTIVITY = Value}} {{}}} {{sealed trait Entity extends Serializable}} {{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}} What I try to do is{{ to convert a table after selection to an appendStream.}} {{/** activity table **/}} {{val activityDataStream = partialComputation1}} {{ .filter(_._1 == EntityType.ACTIVITY)}} {{ .map(x => x._3.asInstanceOf[Activity])}} {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}} {{val selectedTable = tableEnv.scan("activity").select("card_id, second")}} {{selectedTable.printSchema()}} {{// root}} {{// |-- card_id: BIGINT}} {{// |-- second: BIGINT}} {{// ATTEMPT 1}} {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 2}} {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 3}} {{// val output = tableEnv.toAppendStream[Row](selectedTable)}} {{// output.print}} {{// ATTEMPT 4}} {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}} {{output.print}} The result for each of the attempts is always the same: {{--- The program finished with the following exception:}} {{org.apache.flink.client.program.ProgramInvocation
[jira] [Created] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.
Stefano created FLINK-13944: --- Summary: Table.toAppendStream: InvalidProgramException: Table program cannot be compiled. Key: FLINK-13944 URL: https://issues.apache.org/jira/browse/FLINK-13944 Project: Flink Issue Type: Bug Components: API / Scala, Table SQL / API Affects Versions: 1.9.0, 1.8.1 Environment: {{$ java -version}} {{ openjdk version "1.8.0_222"}} {{ OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}} {{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}} {{--}} {{$ scala -version}} {{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}} {{--}} {{build.}}{{sbt}} [...] ThisBuild / scalaVersion := "2.11.12" val flinkVersion = "1.9.0" val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided") [...] Reporter: Stefano Attachments: app.zip {{Using: Scala streaming API and the StreamTableEnvironment.}} Given the classes: {{object EntityType extends Enumeration {}} {{ type EntityType = Value}} {{ val ACTIVITY = Value}} {{}}} {{sealed trait Entity extends Serializable}} {{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}} What I try to do is{{ to convert a table after selection to an appendStream.}} {{/** activity table **/}} {{val activityDataStream = partialComputation1}} {{ .filter(_._1 == EntityType.ACTIVITY)}} {{ .map(x => x._3.asInstanceOf[Activity])}} {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}} {{val selectedTable = tableEnv.scan("activity").select("card_id, second")}} {{selectedTable.printSchema()}} {{// root}} {{// |-- card_id: BIGINT}} {{// |-- second: BIGINT}} {{// ATTEMPT 1}} {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 2}} {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 3}} {{// val output = tableEnv.toAppendStream[Row](selectedTable)}} {{// output.print}} {{// ATTEMPT 4}} {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}} {{output.print}} The result for each of the attempts is always the same: {{--- The program finished with the following exception:}} {{org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 334fe364c516008ca34b76e27c5c6f79) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at ... 23 more Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)}} My project in which I face the error is attached. -- This message was sent by Atlassian Jira (v8.3.2#803003)