[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-30 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727761#comment-17727761
 ] 

Shengkai Fang commented on FLINK-31967:
---

[~padavan] Yes we have already merged into the master, 1.17 and 1.16. It will a 
bug fix version for 1.17 in the future. But right now you should package the 
latest 1.17 branch. It's welcome you report the new problems here.

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Assignee: Haojin Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-27 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726834#comment-17726834
 ] 

padavan commented on FLINK-31967:
-

[~fsk119] already in master ? 

How can I check what works? Will there be new versions of java packages ? 
1.17.2 ? 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Assignee: Haojin Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-24 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726029#comment-17726029
 ] 

Shengkai Fang commented on FLINK-31967:
---

Merged into master: ac6aedbf0f35ba9734108a3c347e649bbf231c62

Merged into release-1.17: (TODO)

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Assignee: Haojin Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-18 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17723826#comment-17723826
 ] 

padavan commented on FLINK-31967:
-

[~fsk119] [~pyro] 

This looks like basic functionality that is often used. It's very strange that 
it still doesn't work. I hope to get this fixed as soon as possible (y)

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Assignee: Haojin Wang
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-15 Thread Haojin Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722665#comment-17722665
 ] 

Haojin Wang commented on FLINK-31967:
-

This problem is caused by the basic data type of Java, and I will propose a PR 
to solve this problem in the future

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-11 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721969#comment-17721969
 ] 

Shengkai Fang commented on FLINK-31967:
---

[~padavan] Sure. There will be a PR to fix this soon. 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-11 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721936#comment-17721936
 ] 

padavan commented on FLINK-31967:
-

[~fsk119] with Integer work. 

I think it is at least unexpected behavior that int != Integer. Will there be 
any fixes? Or will you just improve the output error with a description?

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-11 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721756#comment-17721756
 ] 

Shengkai Fang commented on FLINK-31967:
---

[~padavan] after investigation, the type inference is not correct for the lag 
function. For a quick fix, you can modify the type of the `count` in the 
UserModel to `Integer`. 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-05-07 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720303#comment-17720303
 ] 

padavan commented on FLINK-31967:
-

[~martijnvisser] [~jark] [~lincoln.86xy] 

If you need anything else, let me know, i'll do it. :)

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717688#comment-17717688
 ] 

padavan commented on FLINK-31967:
-

[~martijnvisser] 

Add Project

[^simpleFlinkKafkaLag.zip]

with two functions (work datagen and exception kafka)

!image-2023-04-28-17-06-20-737.png!

For kafta input example: 
{code:java}
{"userId":1,"count":5,"dt":"2023-04-28T14:02:23.113Z"}{code}
 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png, image-2023-04-28-17-06-20-737.png, 
> simpleFlinkKafkaLag.zip
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717675#comment-17717675
 ] 

Martijn Visser commented on FLINK-31967:


That would be great for debugging, yes. 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717672#comment-17717672
 ] 

padavan commented on FLINK-31967:
-

[~martijnvisser] 

wow , intresting if use DataGen then all work fine: 
{code:java}
  private static void t1_LeadLag(StreamExecutionEnvironment env) {
        StreamTableEnvironment te = StreamTableEnvironment.create(env);

        te.executeSql(
                "CREATE TABLE users (\n" +
                        "    userId INT,\n" +
                        "    `count` INT,\n" +
                        "    proctime AS PROCTIME()\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'fields.userId.min' = '1',\n" +
                        "  'fields.userId.max' = '10',\n" +
                        "  'fields.count.min' = '0',\n" +
                        "  'fields.count.max' = '10'\n" +
                        ");");

/*      
Table t = te.fromDataStream(ds, 
Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
        t.printSchema();
        te.createTemporaryView("users", t);
*/

        Table res = te.sqlQuery("SELECT userId, `count`, proctime," +
                " LAG(`count`, 1, 1) OVER w AS prev_quantity" +
                " FROM users" +
                " WINDOW w AS (ORDER BY proctime)");
 
        te.toChangelogStream(res).print();
    }
{code}
 

I work with kafka and did various ETLs with it, Window Agg all worked fine.

But when I started working with the LAG and above code, it gives an exception. 

What can it be?
If need, Maybe I should make a project archive with my example (but you need 
kafka (for example in docker))? 

 

 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> 

[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717652#comment-17717652
 ] 

Martijn Visser commented on FLINK-31967:


I can't reproduce it. You're using Kafka Source, so it could be your incoming 
data. Can you create a reproducer with the datagen connector? 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png, 
> image-2023-04-28-15-17-49-144.png
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717647#comment-17717647
 ] 

padavan commented on FLINK-31967:
-

[~martijnvisser]  i sync to latest version 

1.17.0

nothing change , same error

!image-2023-04-28-15-14-58-788.png!

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png, image-2023-04-28-15-14-58-788.png
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717642#comment-17717642
 ] 

padavan commented on FLINK-31967:
-

[~martijnvisser] 

Since there is no flink-table-api-1.17.0 . I downgrade all packages to the last 
same 1.16.1. The error is still the same, nothing has changed :(

!image-2023-04-28-15-06-48-184.png!

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png, 
> image-2023-04-28-15-06-48-184.png
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717636#comment-17717636
 ] 

Martijn Visser commented on FLINK-31967:


[~padavan] You are mixing different versions of Flink (1.17.0, 1.16.1). That 
won't work. Please make sure that you are all using the same versions. 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
> Attachments: image-2023-04-28-14-46-19-736.png
>
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread padavan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717628#comment-17717628
 ] 

padavan commented on FLINK-31967:
-

[~martijnvisser] 

/usr/lib/jvm/java-1.11.0-openjdk-amd64/

 

 
{code:java}
 public static void main(String[] args) throws Exception {
  
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
  KafkaSource source = KafkaSource.builder()
.setBootstrapServers(kafka)
                .setTopics("x1").setGroupId("flink_group")
                .setValueOnlyDeserializer(new JsonConverter()).build();
 
        WatermarkStrategy strategy = WatermarkStrategy.  
 forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((i, timestamp) -> {
                  long time = i.dt.toInstant(ZoneOffset.UTC).toEpochMilli();
                    return time;
                });
        DataStream ds = env.fromSource(source, strategy, "Kafka 
Source");
        
t1_LeadLag(ds, env)
env.execute("Flink Java API Skeleton");
}
{code}
 

 
private static void t1_LeadLag(DataStream ds, 
StreamExecutionEnvironment env) {
StreamTableEnvironment te = StreamTableEnvironment.create(env);
Table t = te.fromDataStream(ds, 
Schema.newBuilder().columnByExpression("proctime", "proctime()").build());

te.createTemporaryView("users", t);

Table res = te.sqlQuery("SELECT userId, `count`,\n" +" 
LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS prev_quantity\n" + 
   " FROM users");

te.toChangelogStream(res).print();
}
 

 

 

 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> 

[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717552#comment-17717552
 ] 

Martijn Visser commented on FLINK-31967:


Also, when using:

{code:sql}
CREATE TABLE users (
userId INT,
`count` INT,
proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen',
  'fields.userId.min' = '1',
  'fields.userId.max' = '10',
  'fields.count.min' = '0',
  'fields.count.max' = '10'
);
{code}

I get results as I would expect. 

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31967) SQL with LAG function NullPointerException

2023-04-28 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717550#comment-17717550
 ] 

Martijn Visser commented on FLINK-31967:


I can't reproduce this. I've created the following table:

{code:sql}
CREATE TABLE users (
userId STRING,
`count` INT,
proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen',
  'fields.count.min' = '0',
  'fields.count.max' = '0'
);
{code}

And then run 

{code:sql}
SELECT userId, `count`, LAG(`count`) OVER (PARTITION BY userId ORDER BY 
proctime) AS prev_quantity FROM users;
{code}

On Flink 1.17.0. There's no NPE for me. Which Flink version did you use?

> SQL with LAG function NullPointerException
> --
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: padavan
>Priority: Major
>
> I want to make a query with the LAG function. And got Job Exception without 
> any explanations.
>  
> *Code:*
> {code:java}
> private static void t1_LeadLag(DataStream ds, 
> StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, 
> Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS 
> prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }{code}
>  
> *Input:*
> {"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
>  
> *Exception:* I remove part about basic JobExecutionException and kept the 
> important(i think)
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at 
> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at 
> org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown
>  Source)
> at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at 
> org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)