[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835457#comment-17835457 ] Sergey Nuyanzin commented on FLINK-28693: - 1.18: [f5c62abf7475ea8bc976de2a2079b1a9e29b79df|https://github.com/apache/flink/commit/f5c62abf7475ea8bc976de2a2079b1a9e29b79df] > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at >
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833489#comment-17833489 ] Sergey Nuyanzin commented on FLINK-28693: - Merged as [0acf92f1c8a90dcb3eb2c1038c1cda3344b7b988|https://github.com/apache/flink/commit/0acf92f1c8a90dcb3eb2c1038c1cda3344b7b988] > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at >
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815957#comment-17815957 ] Sebastien Pereira commented on FLINK-28693: --- We are facing similar issue in 1.18 - our analyses lead to this: Once deployed, the job fails with: *_Could not instantiate generated class 'WatermarkGenerator$0'_* {code:java} org.apache.flink.runtime.taskmanager.Task [] - Source: source_1___TABLE[1] -> Calc[2] -> KafkaSinkTable6[3]: Writer -> KafkaSinkTable6[3]: Committer (1/1)#0 (b7ae8e7fdeab754fd21c02a17b7736aa_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause: 2024-02-08T16:25:56.613700525Z java.lang.RuntimeException: Could not instantiate generated class 'WatermarkGenerator$0' 2024-02-08T16:25:56.613703025Z at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) ~[flink-table-runtime-1.18.0.jar:1.18.0] [...] Caused by: org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.{code} To reproduce, the source table should meet these conditions: * the watermark definition is based on a computed column, * the computed column relies on a UDF, * uses the `kafka` connector {code:java} CREATE TABLE `source_1___TABLE` ( `ts` STRING, `ts___EVENT_TIME` AS CAST (TO_TIMESTAMP_UDF(`ts`) AS TIMESTAMP(3)), WATERMARK FOR `ts___EVENT_TIME` AS `ts___EVENT_TIME` - INTERVAL '1' MINUTE ){code} Note that with the `filesystem` connector, such declaration deploys and consume events as expected. Behavior is consistent when the job is created and deployed directly through the table API or through the SQL client using SQL statement definition. > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > Labels: pull-request-available > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at >
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815141#comment-17815141 ] xuyang commented on FLINK-28693: This bug is caused by that the code generated by codegen references the class in the table-planner package, but the class in the table-planner package is hidden by table-planner-loader, so classloader cannot find it. I'll try to fix it. > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804161#comment-17804161 ] ude commented on FLINK-28693: - same problem [FLINK-34016|https://issues.apache.org/jira/browse/FLINK-34016] > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) >
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755761#comment-17755761 ] yong yang commented on FLINK-28693: --- watermark with column by udf may cause a error. when i replace json_value(build-in function) with myUDF, no error. > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.15.1.jar:1.15.1] > at >
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571487#comment-17571487 ] Hongbo commented on FLINK-28693: Swapping replacing flink-table-planner-loader with flink-table-planer in the opt/ folder can solve the problem. Is there any drawback to the swap? (If not, why use flink-table-planner-loader as the default, as I saw others also reported problems about this lib). > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at >
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571444#comment-17571444 ] Hongbo commented on FLINK-28693: Is it caused by change in the codegen logic or a classloader problem because of the new module `{{{}flink-table-planner-loader{}}}`? > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue.at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.15.1.jar:1.15.1]at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) >