[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727761#comment-17727761 ] Shengkai Fang commented on FLINK-31967: --- [~padavan] Yes we have already merged into the master, 1.17 and 1.16. It will a bug fix version for 1.17 in the future. But right now you should package the latest 1.17 branch. It's welcome you report the new problems here. > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Assignee: Haojin Wang >Priority: Major > Labels: pull-request-available > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726834#comment-17726834 ] padavan commented on FLINK-31967: - [~fsk119] already in master ? How can I check what works? Will there be new versions of java packages ? 1.17.2 ? > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Assignee: Haojin Wang >Priority: Major > Labels: pull-request-available > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726029#comment-17726029 ] Shengkai Fang commented on FLINK-31967: --- Merged into master: ac6aedbf0f35ba9734108a3c347e649bbf231c62 Merged into release-1.17: (TODO) > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Assignee: Haojin Wang >Priority: Major > Labels: pull-request-available > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17723826#comment-17723826 ] padavan commented on FLINK-31967: - [~fsk119] [~pyro] This looks like basic functionality that is often used. It's very strange that it still doesn't work. I hope to get this fixed as soon as possible (y) > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Assignee: Haojin Wang >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722665#comment-17722665 ] Haojin Wang commented on FLINK-31967: - This problem is caused by the basic data type of Java, and I will propose a PR to solve this problem in the future > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721969#comment-17721969 ] Shengkai Fang commented on FLINK-31967: --- [~padavan] Sure. There will be a PR to fix this soon. > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721936#comment-17721936 ] padavan commented on FLINK-31967: - [~fsk119] with Integer work. I think it is at least unexpected behavior that int != Integer. Will there be any fixes? Or will you just improve the output error with a description? > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721756#comment-17721756 ] Shengkai Fang commented on FLINK-31967: --- [~padavan] after investigation, the type inference is not correct for the lag function. For a quick fix, you can modify the type of the `count` in the UserModel to `Integer`. > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720303#comment-17720303 ] padavan commented on FLINK-31967: - [~martijnvisser] [~jark] [~lincoln.86xy] If you need anything else, let me know, i'll do it. :) > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717688#comment-17717688 ] padavan commented on FLINK-31967: - [~martijnvisser] Add Project [^simpleFlinkKafkaLag.zip] with two functions (work datagen and exception kafka) !image-2023-04-28-17-06-20-737.png! For kafta input example: {code:java} {"userId":1,"count":5,"dt":"2023-04-28T14:02:23.113Z"}{code} > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, > simpleFlinkKafkaLag.zip > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717675#comment-17717675 ] Martijn Visser commented on FLINK-31967: That would be great for debugging, yes. > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717672#comment-17717672 ] padavan commented on FLINK-31967: - [~martijnvisser] wow , intresting if use DataGen then all work fine: {code:java} private static void t1_LeadLag(StreamExecutionEnvironment env) { StreamTableEnvironment te = StreamTableEnvironment.create(env); te.executeSql( "CREATE TABLE users (\n" + " userId INT,\n" + " `count` INT,\n" + " proctime AS PROCTIME()\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'fields.userId.min' = '1',\n" + " 'fields.userId.max' = '10',\n" + " 'fields.count.min' = '0',\n" + " 'fields.count.max' = '10'\n" + ");"); /* Table t = te.fromDataStream(ds, Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); t.printSchema(); te.createTemporaryView("users", t); */ Table res = te.sqlQuery("SELECT userId, `count`, proctime," + " LAG(`count`, 1, 1) OVER w AS prev_quantity" + " FROM users" + " WINDOW w AS (ORDER BY proctime)"); te.toChangelogStream(res).print(); } {code} I work with kafka and did various ETLs with it, Window Agg all worked fine. But when I started working with the LAG and above code, it gives an exception. What can it be? If need, Maybe I should make a project archive with my example (but you need kafka (for example in docker))? > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at >
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717652#comment-17717652 ] Martijn Visser commented on FLINK-31967: I can't reproduce it. You're using Kafka Source, so it could be your incoming data. Can you create a reproducer with the datagen connector? > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, > image-2023-04-28-15-17-49-144.png > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717647#comment-17717647 ] padavan commented on FLINK-31967: - [~martijnvisser] i sync to latest version 1.17.0 nothing change , same error !image-2023-04-28-15-14-58-788.png! > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717642#comment-17717642 ] padavan commented on FLINK-31967: - [~martijnvisser] Since there is no flink-table-api-1.17.0 . I downgrade all packages to the last same 1.16.1. The error is still the same, nothing has changed :( !image-2023-04-28-15-06-48-184.png! > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png, > image-2023-04-28-15-06-48-184.png > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717636#comment-17717636 ] Martijn Visser commented on FLINK-31967: [~padavan] You are mixing different versions of Flink (1.17.0, 1.16.1). That won't work. Please make sure that you are all using the same versions. > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > Attachments: image-2023-04-28-14-46-19-736.png > > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717628#comment-17717628 ] padavan commented on FLINK-31967: - [~martijnvisser] /usr/lib/jvm/java-1.11.0-openjdk-amd64/ {code:java} public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource source = KafkaSource.builder() .setBootstrapServers(kafka) .setTopics("x1").setGroupId("flink_group") .setValueOnlyDeserializer(new JsonConverter()).build(); WatermarkStrategy strategy = WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((i, timestamp) -> { long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli(); return time; }); DataStream ds = env.fromSource(source, strategy, "Kafka Source"); t1_LeadLag(ds, env) env.execute("Flink Java API Skeleton"); } {code} private static void t1_LeadLag(DataStream ds, StreamExecutionEnvironment env) { StreamTableEnvironment te = StreamTableEnvironment.create(env); Table t = te.fromDataStream(ds, Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); te.createTemporaryView("users", t); Table res = te.sqlQuery("SELECT userId, `count`,\n" +" LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS prev_quantity\n" + " FROM users"); te.toChangelogStream(res).print(); } > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at >
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717552#comment-17717552 ] Martijn Visser commented on FLINK-31967: Also, when using: {code:sql} CREATE TABLE users ( userId INT, `count` INT, proctime AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'fields.userId.min' = '1', 'fields.userId.max' = '10', 'fields.count.min' = '0', 'fields.count.max' = '10' ); {code} I get results as I would expect. > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717550#comment-17717550 ] Martijn Visser commented on FLINK-31967: I can't reproduce this. I've created the following table: {code:sql} CREATE TABLE users ( userId STRING, `count` INT, proctime AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'fields.count.min' = '0', 'fields.count.max' = '0' ); {code} And then run {code:sql} SELECT userId, `count`, LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS prev_quantity FROM users; {code} On Flink 1.17.0. There's no NPE for me. Which Flink version did you use? > SQL with LAG function NullPointerException > -- > > Key: FLINK-31967 > URL: https://issues.apache.org/jira/browse/FLINK-31967 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: padavan >Priority: Major > > I want to make a query with the LAG function. And got Job Exception without > any explanations. > > *Code:* > {code:java} > private static void t1_LeadLag(DataStream ds, > StreamExecutionEnvironment env) { > StreamTableEnvironment te = StreamTableEnvironment.create(env); > Table t = te.fromDataStream(ds, > Schema.newBuilder().columnByExpression("proctime", "proctime()").build()); > te.createTemporaryView("users", t); > Table res = te.sqlQuery("SELECT userId, `count`,\n" + > " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS > prev_quantity\n" + > " FROM users"); > te.toChangelogStream(res).print(); > }{code} > > *Input:* > {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"} > > *Exception:* I remove part about basic JobExecutionException and kept the > important(i think) > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149) > at > org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245) > at > org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown > Source) > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101) > at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92) > at > org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > at > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)