[jira] [Assigned] (FLINK-9300) Improve error message when in-memory state is too large
[ https://issues.apache.org/jira/browse/FLINK-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9300: --- Assignee: vinoyang > Improve error message when in-memory state is too large > --- > > Key: FLINK-9300 > URL: https://issues.apache.org/jira/browse/FLINK-9300 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: Ken Krugler >Assignee: vinoyang >Priority: Minor > > Currently in the {{MemCheckpointStreamFactory.checkSize()}} method, it can > throw an {{IOException}} via: > {code:java} > throw new IOException( > "Size of the state is larger than the maximum permitted memory-backed state. > Size=" > + size + " , maxSize=" + maxSize > + " . Consider using a different state backend, like the File System State > backend.");{code} > But this will happen even if you’re using the File System State backend. > This came up here: > [https://stackoverflow.com/questions/50149005/ioexception-size-of-the-state-is-larger-than-the-maximum-permitted-memory-backe] > We could change the message to be: > {quote}Please consider increasing the maximum permitted memory size, > increasing the task manager parallelism, or using a non-memory-based state > backend such as RocksDB. > {quote} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors
[ https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9299: --- Assignee: vinoyang > ProcessWindowFunction documentation Java examples have errors > - > > Key: FLINK-9299 > URL: https://issues.apache.org/jira/browse/FLINK-9299 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.2 >Reporter: Ken Krugler >Assignee: vinoyang >Priority: Minor > > In looking at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation], > I noticed a few errors... > * "This allows to incrementally compute windows" should be "This allows it > to incrementally compute windows" > * DataStreaminput = ...; should be > DataStream > input = ...; > * The getResult() method needs to cast one of the accumulator values to a > double, if that's what it is going to return. > * MyProcessWindowFunction needs to extend, not implement > ProcessWindowFunction > * MyProcessWindowFunction needs to implement a process() method, not an > apply() method. > * The call to .timeWindow takes a Time parameter, not a window assigner. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization
[ https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464544#comment-16464544 ] Shuyi Chen commented on FLINK-7001: --- Hi [~pgrulich], the paper is a nice read. And the technique applies to Tumble, Sliding & Session window, which is a good win, and the evaluation result looks good. Also, it seems you already have an implementation for Scotty using Apache Flink based on the paper. Maybe, you and [~jark] can share more, for each approach, about the detail design, pros and cons, and we can discuss them here? > Improve performance of Sliding Time Window with pane optimization > - > > Key: FLINK-7001 > URL: https://issues.apache.org/jira/browse/FLINK-7001 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > Currently, the implementation of time-based sliding windows treats each > window individually and replicates records to each window. For a window of 10 > minute size that slides by 1 second the data is replicated 600 fold (10 > minutes / 1 second). We can optimize sliding window by divide windows into > panes (aligned with slide), so that we can avoid record duplication and > leverage the checkpoint. > I will attach a more detail design doc to the issue. > The following issues are similar to this issue: FLINK-5387, FLINK-6990 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization
[ https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464530#comment-16464530 ] ASF GitHub Bot commented on FLINK-8533: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @StephanEwen thanks again for the feedback, which I took to heart and simplified the hook. It now has a `reset ` method that is called only in the special case. I will refactor the thread context code in a separate PR. > Support MasterTriggerRestoreHook state reinitialization > --- > > Key: FLINK-8533 > URL: https://issues.apache.org/jira/browse/FLINK-8533 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > {{MasterTriggerRestoreHook}} enables coordination with an external system for > taking or restoring checkpoints. When execution is restarted from a > checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the > external system state. There's an edge case where the external state is not > adequately reinitialized, that is when execution fails _before the first > checkpoint_. In that case, the hook is not invoked and has no opportunity to > restore the external state to initial conditions. > The impact is a loss of exactly-once semantics in this case. For example, in > the Pravega source function, the reader group state (e.g. stream position > data) is stored externally. In the normal restore case, the reader group > state is forcibly rewound to the checkpointed position. In the edge case > where no checkpoint has yet been successful, the reader group state is not > rewound and consequently some amount of stream data is not reprocessed. > A possible fix would be to introduce an {{initializeState}} method on the > hook interface. Similar to {{CheckpointedFunction::initializeState}}, this > method would be invoked unconditionally upon hook initialization. The Pravega > hook would, for example, initialize or forcibly reinitialize the reader group > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5427 @StephanEwen thanks again for the feedback, which I took to heart and simplified the hook. It now has a `reset ` method that is called only in the special case. I will refactor the thread context code in a separate PR. ---
[jira] [Created] (FLINK-9300) Improve error message when in-memory state is too large
Ken Krugler created FLINK-9300: -- Summary: Improve error message when in-memory state is too large Key: FLINK-9300 URL: https://issues.apache.org/jira/browse/FLINK-9300 Project: Flink Issue Type: Improvement Affects Versions: 1.4.2 Reporter: Ken Krugler Currently in the {{MemCheckpointStreamFactory.checkSize()}} method, it can throw an {{IOException}} via: {code:java} throw new IOException( "Size of the state is larger than the maximum permitted memory-backed state. Size=" + size + " , maxSize=" + maxSize + " . Consider using a different state backend, like the File System State backend.");{code} But this will happen even if you’re using the File System State backend. This came up here: [https://stackoverflow.com/questions/50149005/ioexception-size-of-the-state-is-larger-than-the-maximum-permitted-memory-backe] We could change the message to be: {quote}Please consider increasing the maximum permitted memory size, increasing the task manager parallelism, or using a non-memory-based state backend such as RocksDB. {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors
Ken Krugler created FLINK-9299: -- Summary: ProcessWindowFunction documentation Java examples have errors Key: FLINK-9299 URL: https://issues.apache.org/jira/browse/FLINK-9299 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.4.2 Reporter: Ken Krugler In looking at [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation], I noticed a few errors... * "This allows to incrementally compute windows" should be "This allows it to incrementally compute windows" * DataStreaminput = ...; should be DataStream > input = ...; * The getResult() method needs to cast one of the accumulator values to a double, if that's what it is going to return. * MyProcessWindowFunction needs to extend, not implement ProcessWindowFunction * MyProcessWindowFunction needs to implement a process() method, not an apply() method. * The call to .timeWindow takes a Time parameter, not a window assigner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
[ https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464434#comment-16464434 ] ASF GitHub Bot commented on FLINK-9138: --- Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/5860 Thanks for reviewing @fhueske @aljoscha and @kl0u ! I have addressed the latest review comments. Can you PTAL (again) ? > Enhance BucketingSink to also flush data by time interval > - > > Key: FLINK-9138 > URL: https://issues.apache.org/jira/browse/FLINK-9138 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > BucketingSink now supports flushing data to the file system by size limit and > by period of inactivity. It will be useful to also flush data by a specified > time period. This way, the data will be written out when write throughput is > low but there is no significant time period gaps between the writes. This > reduces ETA for the data in the file system and should help move the > checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/5860 Thanks for reviewing @fhueske @aljoscha and @kl0u ! I have addressed the latest review comments. Can you PTAL (again) ? ---
[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
[ https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464404#comment-16464404 ] ASF GitHub Bot commented on FLINK-9138: --- Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/5860#discussion_r186223237 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState bucketState) throws IOException { subtaskIndex, writePosition, batchSize); + } else { + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); --- End diff -- Updated method signature for `shouldRoll` to include the `currentProcessingTime` > Enhance BucketingSink to also flush data by time interval > - > > Key: FLINK-9138 > URL: https://issues.apache.org/jira/browse/FLINK-9138 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > BucketingSink now supports flushing data to the file system by size limit and > by period of inactivity. It will be useful to also flush data by a specified > time period. This way, the data will be written out when write throughput is > low but there is no significant time period gaps between the writes. This > reduces ETA for the data in the file system and should help move the > checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/5860#discussion_r186223237 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState bucketState) throws IOException { subtaskIndex, writePosition, batchSize); + } else { + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); --- End diff -- Updated method signature for `shouldRoll` to include the `currentProcessingTime` ---
[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/5860#discussion_r186223099 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -908,6 +929,20 @@ private void handlePendingFilesForPreviousCheckpoints(Mappe return this; } + /** +* Sets the roll over interval in milliseconds. +* +* +* When a bucket part file is older than the roll over interval, a new bucket part file is +* started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}. +* +* @param batchRolloverInterval The roll over interval in milliseconds +*/ + public BucketingSink setBatchRolloverInterval(long batchRolloverInterval) { + this.batchRolloverInterval = batchRolloverInterval; + return this; --- End diff -- Added a check for `batchRolloverInterval` to be a positive non-zero value. ---
[jira] [Commented] (FLINK-9138) Enhance BucketingSink to also flush data by time interval
[ https://issues.apache.org/jira/browse/FLINK-9138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464403#comment-16464403 ] ASF GitHub Bot commented on FLINK-9138: --- Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/5860#discussion_r186223099 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -908,6 +929,20 @@ private void handlePendingFilesForPreviousCheckpoints(Mappe return this; } + /** +* Sets the roll over interval in milliseconds. +* +* +* When a bucket part file is older than the roll over interval, a new bucket part file is +* started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}. +* +* @param batchRolloverInterval The roll over interval in milliseconds +*/ + public BucketingSink setBatchRolloverInterval(long batchRolloverInterval) { + this.batchRolloverInterval = batchRolloverInterval; + return this; --- End diff -- Added a check for `batchRolloverInterval` to be a positive non-zero value. > Enhance BucketingSink to also flush data by time interval > - > > Key: FLINK-9138 > URL: https://issues.apache.org/jira/browse/FLINK-9138 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > BucketingSink now supports flushing data to the file system by size limit and > by period of inactivity. It will be useful to also flush data by a specified > time period. This way, the data will be written out when write throughput is > low but there is no significant time period gaps between the writes. This > reduces ETA for the data in the file system and should help move the > checkpoints faster as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9298) setup_quickstart hanging
Alex Chen created FLINK-9298: Summary: setup_quickstart hanging Key: FLINK-9298 URL: https://issues.apache.org/jira/browse/FLINK-9298 Project: Flink Issue Type: Bug Reporter: Alex Chen Well, I follow tutorial ([https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html)]. Regardless there are several typos like log files name incorrect, flink was hanging executing ` bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000`, looks like: ``` ➜ build-target git:(master) bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000 Starting execution of program (hanging) ``` Below are logs ``` ➜ log git:(master) ls -l total 136 -rw-r--r-- 1 wander staff 6189 5 5 04:34 flink-wander-client-localhost.log -rw-r--r-- 1 wander staff 17823 5 5 04:34 flink-wander-standalonesession-0-localhost.log -rw-r--r-- 1 wander staff 0 5 5 04:33 flink-wander-standalonesession-0-localhost.out -rw-r--r-- 1 wander staff 38220 5 5 04:34 flink-wander-taskexecutor-0-localhost.log -rw-r--r-- 1 wander staff 0 5 5 04:33 flink-wander-taskexecutor-0-localhost.out ➜ log git:(master) cat flink-wander-client-localhost.log 2018-05-05 04:34:19,210 INFO org.apache.flink.client.cli.CliFrontend - 2018-05-05 04:34:19,212 INFO org.apache.flink.client.cli.CliFrontend - Starting Command Line Client (Version: 1.6-SNAPSHOT, Rev:5ac4d29, Date:04.05.2018 @ 22:03:32 CST) 2018-05-05 04:34:19,212 INFO org.apache.flink.client.cli.CliFrontend - OS current user: wander 2018-05-05 04:34:19,732 INFO org.apache.flink.client.cli.CliFrontend - Current Hadoop/Kerberos user: wander 2018-05-05 04:34:19,733 INFO org.apache.flink.client.cli.CliFrontend - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.144-b01 2018-05-05 04:34:19,733 INFO org.apache.flink.client.cli.CliFrontend - Maximum heap size: 1820 MiBytes 2018-05-05 04:34:19,733 INFO org.apache.flink.client.cli.CliFrontend - JAVA_HOME: /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home 2018-05-05 04:34:19,735 INFO org.apache.flink.client.cli.CliFrontend - Hadoop version: 2.4.1 2018-05-05 04:34:19,735 INFO org.apache.flink.client.cli.CliFrontend - JVM Options: 2018-05-05 04:34:19,735 INFO org.apache.flink.client.cli.CliFrontend - -Dlog.file=/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/log/flink-wander-client-localhost.log 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - -Dlog4j.configuration=file:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/log4j-cli.properties 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - -Dlogback.configurationFile=file:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/logback.xml 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - Program Arguments: 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - run 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - examples/streaming/SocketWindowWordCount.jar 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - --port 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - 9000 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend - Classpath: :/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/flink-dist_2.11-1.6-SNAPSHOT.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/flink-python_2.11-1.6-SNAPSHOT.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/flink-shaded-hadoop2-uber-1.6-SNAPSHOT.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/log4j-1.2.17.jar:/Users/wander/Documents/Apocalypse/repos/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/lib/slf4j-log4j12-1.7.7.jar::: 2018-05-05 04:34:19,736 INFO org.apache.flink.client.cli.CliFrontend -
[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization
[ https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464248#comment-16464248 ] Philipp Grulich commented on FLINK-7001: Hi [~walterddr] , we published recently this paper about our new Window Operator: http://www.user.tu-berlin.de/powibol/assets/publications/traub-scotty-icde-2018.pdf It would definitely provide a huge performance improvement in contrast to the current Flink implementation. I think a FLIP was not written yet. Best, Philipp > Improve performance of Sliding Time Window with pane optimization > - > > Key: FLINK-7001 > URL: https://issues.apache.org/jira/browse/FLINK-7001 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > Currently, the implementation of time-based sliding windows treats each > window individually and replicates records to each window. For a window of 10 > minute size that slides by 1 second the data is replicated 600 fold (10 > minutes / 1 second). We can optimize sliding window by divide windows into > panes (aligned with slide), so that we can avoid record duplication and > leverage the checkpoint. > I will attach a more detail design doc to the issue. > The following issues are similar to this issue: FLINK-5387, FLINK-6990 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization
[ https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464239#comment-16464239 ] Rong Rong commented on FLINK-7001: -- Hi [~jark], Is there a FLIP being proposed based on the pain points discussed in here? This inefficiency in windowing has been observed more and more frequently in our day-to-day operations lately. We would like to contribute to the design and the implementation of this improvement if possible :-) Thanks, Rong > Improve performance of Sliding Time Window with pane optimization > - > > Key: FLINK-7001 > URL: https://issues.apache.org/jira/browse/FLINK-7001 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > Currently, the implementation of time-based sliding windows treats each > window individually and replicates records to each window. For a window of 10 > minute size that slides by 1 second the data is replicated 600 fold (10 > minutes / 1 second). We can optimize sliding window by divide windows into > panes (aligned with slide), so that we can avoid record duplication and > leverage the checkpoint. > I will attach a more detail design doc to the issue. > The following issues are similar to this issue: FLINK-5387, FLINK-6990 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #:
Github user zentol commented on the pull request: https://github.com/apache/flink/commit/c8fa8d025684c2225824c54a7285bbfdec7cfddc#commitcomment-28859963 In flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java: In flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java on line 25: you added spaces here ---
[jira] [Commented] (FLINK-9292) Remove TypeInfoParser
[ https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464135#comment-16464135 ] Stephan Ewen commented on FLINK-9292: - Feel free to take over the second half of this issue, if you want... > Remove TypeInfoParser > - > > Key: FLINK-9292 > URL: https://issues.apache.org/jira/browse/FLINK-9292 > Project: Flink > Issue Type: Task > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}. > Because the TypeInfoParser is also not working correctly with respect to > classloading, we should remove it. Users still find the class, try to use it, > and run into problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9292) Remove TypeInfoParser
[ https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464134#comment-16464134 ] Stephan Ewen commented on FLINK-9292: - I have merged the first part of removing the the {{TypeInfoParser}}, which removes it from various tests and the DataSet API. > Remove TypeInfoParser > - > > Key: FLINK-9292 > URL: https://issues.apache.org/jira/browse/FLINK-9292 > Project: Flink > Issue Type: Task > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}. > Because the TypeInfoParser is also not working correctly with respect to > classloading, we should remove it. Users still find the class, try to use it, > and run into problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5940 LOL. I think I found a way: 1. Rebase #3764 over to current master; 2. Rebase this branch to the rebased #3764; 3. Make changes on top :-) ---
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464071#comment-16464071 ] ASF GitHub Bot commented on FLINK-8690: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5940 LOL. I think I found a way: 1. Rebase #3764 over to current master; 2. Rebase this branch to the rebased #3764; 3. Make changes on top :-) > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464063#comment-16464063 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 Hmmm, good point. The discussion would be lost. How about I put your changes on top of Haohui's changes before merging? > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 Hmmm, good point. The discussion would be lost. How about I put your changes on top of Haohui's changes before merging? ---
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464039#comment-16464039 ] ASF GitHub Bot commented on FLINK-8690: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5940 Thanks @suez1224 @fhueske for the comments. I will change them accordingly. Yes I copied a lot of test cases from @haohui's PR for my own testing. I can definitely put it on top given the runtime support is already merged in #. Procedure-wise question: should I rebase his commit then add my change on top, then attached to this PR? I am not sure if there's a clever way to both preserve the discussion in this thread and rebase on top of his change. > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5940 Thanks @suez1224 @fhueske for the comments. I will change them accordingly. Yes I copied a lot of test cases from @haohui's PR for my own testing. I can definitely put it on top given the runtime support is already merged in #. Procedure-wise question: should I rebase his commit then add my change on top, then attached to this PR? I am not sure if there's a clever way to both preserve the discussion in this thread and rebase on top of his change. ---
[jira] [Created] (FLINK-9297) Make size of JobManager's ioExecutor configurable
Jelmer Kuperus created FLINK-9297: - Summary: Make size of JobManager's ioExecutor configurable Key: FLINK-9297 URL: https://issues.apache.org/jira/browse/FLINK-9297 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 1.4.2, 1.4.1, 1.4.0 Reporter: Jelmer Kuperus With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the SharedStateRegistry uses the job manager's [io executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999] as the executor to schedule disposals of checkpoint state on. This executor has a fixed size equal to the number of cpu cores. When a lot of small files are created this may not be enough. It would be good to make this setting configurable. Initializing an executor service with the number of cpu's on the system makes sense for cpu intensive tasks less so for io based workloads -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9297) Make size of JobManager's ioExecutor configurable
[ https://issues.apache.org/jira/browse/FLINK-9297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jelmer Kuperus updated FLINK-9297: -- Description: With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the SharedStateRegistry uses the job manager's [io executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999] as the executor to schedule disposals of checkpoint state on. This executor has a fixed size equal to the number of cpu cores. When a lot of small files are created by many jobs and a slow filesystem is used this may not be enough. It would be good to make this setting configurable. Initializing an executor service with the number of cpu's on the system makes sense for cpu intensive tasks less so for io based workloads was: With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the SharedStateRegistry uses the job manager's [io executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999] as the executor to schedule disposals of checkpoint state on. This executor has a fixed size equal to the number of cpu cores. When a lot of small files are created this may not be enough. It would be good to make this setting configurable. Initializing an executor service with the number of cpu's on the system makes sense for cpu intensive tasks less so for io based workloads > Make size of JobManager's ioExecutor configurable > - > > Key: FLINK-9297 > URL: https://issues.apache.org/jira/browse/FLINK-9297 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.4.0, 1.4.1, 1.4.2 >Reporter: Jelmer Kuperus >Priority: Major > > With flink 1.4.0 cleaning of checkpoints is handled on the job manager, the > SharedStateRegistry uses the job manager's [io > executor|https://github.com/apache/flink/blob/e5ed2fbc4a31808bea8be89b371bbe4269288003/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L1999] > as the executor to schedule disposals of checkpoint state on. This > executor has a fixed size equal to the number of cpu cores. When a lot of > small files are created by many jobs and a slow filesystem is used this may > not be enough. > It would be good to make this setting configurable. Initializing an executor > service with the number of cpu's on the system makes sense for cpu intensive > tasks less so for io based workloads > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8497) KafkaConsumer throws NPE if topic doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463984#comment-16463984 ] ASF GitHub Bot commented on FLINK-8497: --- Github user alexpf commented on a diff in the pull request: https://github.com/apache/flink/pull/5929#discussion_r186110995 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java --- @@ -74,7 +74,12 @@ protected void initializeConnections() { try { for (String topic : topics) { - for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) { + List topicPartitions = kafkaConsumer.partitionsFor(topic); + if (topicPartitions == null) { + throw new IllegalStateException("The topic " + topic + " does not exist"); --- End diff -- I've took a look into it. Ideally we have to make the go/fail decision one level higher, at `FlinkKafkaConsumerBase`. It uses the partition discovery both for the initial seed and further on the run, so that's the guy who should decide. The problem I see here is that the `AbstractPartitionDiscoverer#discoverPartitions` doesn't just get the list of available partitions, but also filters the partitions applicable for the current task. So, once we get the partition list, we can't say whether the list is empty because nothing is found, or because the partitions have been post-filtered. The only way to communicate this difference, as I see it now, is to introduce some new specific exception, and catch it at the `FlinkKafkaConsumerBase`. > KafkaConsumer throws NPE if topic doesn't exist > --- > > Key: FLINK-8497 > URL: https://issues.apache.org/jira/browse/FLINK-8497 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: chris snow >Assignee: Aleksei Lesnov >Priority: Minor > > If I accidentally set the kafka consumer with a topic that doesn't exist: > {code:java} > FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011( >"does_not_exist", > new JSONKeyValueDeserializationSchema(false), > properties > ); > DataStream input = env.addSource(kafkaConsumer);{code} > Flink throws NPE > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748){code} > Maybe Flink could through an IllegalStateException("Topic not found")? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE...
Github user alexpf commented on a diff in the pull request: https://github.com/apache/flink/pull/5929#discussion_r186110995 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java --- @@ -74,7 +74,12 @@ protected void initializeConnections() { try { for (String topic : topics) { - for (PartitionInfo partitionInfo : kafkaConsumer.partitionsFor(topic)) { + List topicPartitions = kafkaConsumer.partitionsFor(topic); + if (topicPartitions == null) { + throw new IllegalStateException("The topic " + topic + " does not exist"); --- End diff -- I've took a look into it. Ideally we have to make the go/fail decision one level higher, at `FlinkKafkaConsumerBase`. It uses the partition discovery both for the initial seed and further on the run, so that's the guy who should decide. The problem I see here is that the `AbstractPartitionDiscoverer#discoverPartitions` doesn't just get the list of available partitions, but also filters the partitions applicable for the current task. So, once we get the partition list, we can't say whether the list is empty because nothing is found, or because the partitions have been post-filtered. The only way to communicate this difference, as I see it now, is to introduce some new specific exception, and catch it at the `FlinkKafkaConsumerBase`. ---
[jira] [Updated] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-8978: -- Fix Version/s: (was: 1.5.1) 1.5.0 > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-8978. - Resolution: Fixed > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reopened FLINK-8978: --- Change fix version > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-8978. - Resolution: Fixed Merged in: master: 5ac4d29609 release-1.5: 54befe5a31 > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463918#comment-16463918 ] ASF GitHub Bot commented on FLINK-8978: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5947 > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5947 ---
[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463907#comment-16463907 ] ASF GitHub Bot commented on FLINK-8978: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5947 LGTM Will merge this. > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5947 LGTM ð Will merge this. ---
[jira] [Closed] (FLINK-9254) Move NotSoMiniClusterIterations to be an end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-9254. - Resolution: Fixed Merged in: master: b50cebb656 release-1.5: ed3447e343 > Move NotSoMiniClusterIterations to be an end-to-end test > > > Key: FLINK-9254 > URL: https://issues.apache.org/jira/browse/FLINK-9254 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.5.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Minor > Fix For: 1.5.0 > > > NotSoMiniClusterIterations should be a test that we don’t run for every > commit but nightly e2e. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9254) Move NotSoMiniClusterIterations to be an end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463884#comment-16463884 ] ASF GitHub Bot commented on FLINK-9254: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5921 > Move NotSoMiniClusterIterations to be an end-to-end test > > > Key: FLINK-9254 > URL: https://issues.apache.org/jira/browse/FLINK-9254 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.5.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Minor > Fix For: 1.5.0 > > > NotSoMiniClusterIterations should be a test that we don’t run for every > commit but nightly e2e. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5921: [FLINK-9254] Move NotSoMiniClusterIterations to be...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5921 ---
[jira] [Commented] (FLINK-9254) Move NotSoMiniClusterIterations to be an end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463876#comment-16463876 ] ASF GitHub Bot commented on FLINK-9254: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5921 In that case, LGTM Will merge this. > Move NotSoMiniClusterIterations to be an end-to-end test > > > Key: FLINK-9254 > URL: https://issues.apache.org/jira/browse/FLINK-9254 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.5.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Minor > Fix For: 1.5.0 > > > NotSoMiniClusterIterations should be a test that we don’t run for every > commit but nightly e2e. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5921: [FLINK-9254] Move NotSoMiniClusterIterations to be an end...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5921 In that case, LGTM ð Will merge this. ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463873#comment-16463873 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Overall, I think this looks good for me now > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Overall, I think this looks good for me now ð ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463869#comment-16463869 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r186082804 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -102,17 +99,24 @@ * * @return the deserialized serializer. */ - public static TypeSerializer tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) { + public static TypeSerializer tryReadSerializer( + DataInputView in, + ClassLoader userCodeClassLoader, + boolean useDummyPlaceholder) throws IOException { + final TypeSerializerSerializationUtil.TypeSerializerSerializationProxy proxy = - new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader, useDummyPlaceholder); + new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader); try { proxy.read(in); return proxy.getTypeSerializer(); - } catch (IOException e) { - LOG.warn("Deserialization of serializer errored; replacing with null.", e); - - return null; + } catch (UnloadableTypeSerializerException e) { --- End diff -- I would let this bubble up one more level, remove the flag here and only catch `UnloadableTypeSerializerException ` in the case where this method is called with `true`. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in >
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r186082804 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -102,17 +99,24 @@ * * @return the deserialized serializer. */ - public static TypeSerializer tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) { + public static TypeSerializer tryReadSerializer( + DataInputView in, + ClassLoader userCodeClassLoader, + boolean useDummyPlaceholder) throws IOException { + final TypeSerializerSerializationUtil.TypeSerializerSerializationProxy proxy = - new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader, useDummyPlaceholder); + new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader); try { proxy.read(in); return proxy.getTypeSerializer(); - } catch (IOException e) { - LOG.warn("Deserialization of serializer errored; replacing with null.", e); - - return null; + } catch (UnloadableTypeSerializerException e) { --- End diff -- I would let this bubble up one more level, remove the flag here and only catch `UnloadableTypeSerializerException ` in the case where this method is called with `true`. ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463867#comment-16463867 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Yes, I think we can only remove the flag further when splitting up `readSerializersAndConfigsWithResilience`, but I guess it is ok to leave it if we change this soon anyways. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Yes, I think we can only remove the flag further when splitting up `readSerializersAndConfigsWithResilience`, but I guess it is ok to leave it if we change this soon anyways. ---
[jira] [Commented] (FLINK-9292) Remove TypeInfoParser
[ https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463784#comment-16463784 ] Stephan Ewen commented on FLINK-9292: - This issue needs a lot of care to not break functionality. I have a first part of that done already, taking care also of the deprecated parts of the DataSet API. I would recommend for that to be merged first... A lot of remaining work is actually adjusting tests... > Remove TypeInfoParser > - > > Key: FLINK-9292 > URL: https://issues.apache.org/jira/browse/FLINK-9292 > Project: Flink > Issue Type: Task > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}. > Because the TypeInfoParser is also not working correctly with respect to > classloading, we should remove it. Users still find the class, try to use it, > and run into problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463761#comment-16463761 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186033706 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (8L, 8, "Hello"), + (9L, 9, "Hello World"), + (4L, 4, "Hello"), + (16L, 16, "Hello")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setParallelism(1) +StreamITCase.clear +val stream = env + .fromCollection(sessionWindowTestdata) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L)) + +val tEnv = TableEnvironment.getTableEnvironment(env) +val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) +tEnv.registerTable("MyTable", table) + +val sqlQuery = "SELECT string, " + + " COUNT(DISTINCT long) " + --- End diff -- It would be good to add the end timestamp of the windows (`SESSION_END(rowtime, INTERVAL '0.005' SECOND)`) to make it easier to eyeball the expected test results. > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463760#comment-16463760 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186022377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -641,7 +641,8 @@ class AggregationCodeGenerator( | java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | Object k = entry.getKey(); --- End diff -- Change this line to `${classOf[Row].getCanonicalName} k = (${classOf[Row].getCanonicalName}) entry.getKey();` > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463763#comment-16463763 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186029985 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.{Ignore, Test} + +class DistinctAggregateTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]( +"MyTable", +'a, 'b, 'c, +'proctime.proctime, 'rowtime.rowtime) + + @Test + def testDistinct(): Unit = { +val sql = "SELECT DISTINCT a, b, c FROM MyTable" + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a, b, c") +), +term("groupBy", "a, b, c"), +term("select", "a, b, c") + ) +streamUtil.verifySql(sql, expected) + } + + // TODO: this query should be optimized to only have a single DataStreamGroupAggregate + // TODO: reopen this until FLINK-7144 fixed + @Ignore + @Test + def testDistinctAfterAggregate(): Unit = { +val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c" + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a") +), +term("groupBy", "a"), +term("select", "a") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testDistinctAggregateOnTumbleWindow(): Unit = { +val sqlQuery = "SELECT COUNT(DISTINCT a), " + + " SUM(a) " + + "FROM MyTable " + + "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) " + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "rowtime", "a") + ), + term("window", TumblingGroupWindow('w$, 'rowtime, 90.millis)), + term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1") +) + +streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = { +val sqlQuery = "SELECT COUNT(DISTINCT a), " + + " SUM(DISTINCT a), " + + " MAX(DISTINCT a) " + + "FROM MyTable " + + "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "rowtime", "a") + ), + term("window", SlidingGroupWindow('w$, 'rowtime, 360.millis, 90.millis)), + term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1", +"MAX(DISTINCT a) AS EXPR$2") +) + +streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = { +val sqlQuery = "SELECT a, " + + "
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463759#comment-16463759 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186033401 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), --- End diff -- The test is not checking for DISTINCT semantics since all aggregated values are distinct. We could do `COUNT(DISTINCT num)` (`int` has to be renamed to `num` because its a SQL keyword). > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463758#comment-16463758 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r185955967 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -641,7 +641,8 @@ class AggregationCodeGenerator( | java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | Object k = entry.getKey(); | Long v = (Long) entry.getValue(); - | if (aDistinctAcc$i.add(k, v)) { + | if (aDistinctAcc$i.add( --- End diff -- The key in the entry is a `Row` already > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463762#comment-16463762 ] ASF GitHub Bot commented on FLINK-8690: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186036107 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), --- End diff -- To check the correct merge behavior, we need two windows which aggregate the same value that is than deduplicated in merge. Some data like: ``` (1L, 2, "Hello"), // 1. Hello window (2L, 2, "Hello"), // 1. Hello window, deduped (8L, 2, "Hello"), // 2. Hello window, deduped during merge (10L, 3, "Hello"), // 2. Hello window, forwarded during merge (9L, 9, "Hello World"), // 1. Hello World window (4L, 1, "Hello"), // 1. Hello window, triggering merge of 1. and 2. Hello windows (16L, 16, "Hello")) // 3. Hello window (not merged) ``` > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186033706 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (8L, 8, "Hello"), + (9L, 9, "Hello World"), + (4L, 4, "Hello"), + (16L, 16, "Hello")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setParallelism(1) +StreamITCase.clear +val stream = env + .fromCollection(sessionWindowTestdata) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L)) + +val tEnv = TableEnvironment.getTableEnvironment(env) +val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) +tEnv.registerTable("MyTable", table) + +val sqlQuery = "SELECT string, " + + " COUNT(DISTINCT long) " + --- End diff -- It would be good to add the end timestamp of the windows (`SESSION_END(rowtime, INTERVAL '0.005' SECOND)`) to make it easier to eyeball the expected test results. ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186022377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -641,7 +641,8 @@ class AggregationCodeGenerator( | java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | Object k = entry.getKey(); --- End diff -- Change this line to `${classOf[Row].getCanonicalName} k = (${classOf[Row].getCanonicalName}) entry.getKey();` ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186029985 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.{Ignore, Test} + +class DistinctAggregateTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]( +"MyTable", +'a, 'b, 'c, +'proctime.proctime, 'rowtime.rowtime) + + @Test + def testDistinct(): Unit = { +val sql = "SELECT DISTINCT a, b, c FROM MyTable" + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a, b, c") +), +term("groupBy", "a, b, c"), +term("select", "a, b, c") + ) +streamUtil.verifySql(sql, expected) + } + + // TODO: this query should be optimized to only have a single DataStreamGroupAggregate + // TODO: reopen this until FLINK-7144 fixed + @Ignore + @Test + def testDistinctAfterAggregate(): Unit = { +val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c" + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a") +), +term("groupBy", "a"), +term("select", "a") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testDistinctAggregateOnTumbleWindow(): Unit = { +val sqlQuery = "SELECT COUNT(DISTINCT a), " + + " SUM(a) " + + "FROM MyTable " + + "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) " + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "rowtime", "a") + ), + term("window", TumblingGroupWindow('w$, 'rowtime, 90.millis)), + term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1") +) + +streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = { +val sqlQuery = "SELECT COUNT(DISTINCT a), " + + " SUM(DISTINCT a), " + + " MAX(DISTINCT a) " + + "FROM MyTable " + + "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "rowtime", "a") + ), + term("window", SlidingGroupWindow('w$, 'rowtime, 360.millis, 90.millis)), + term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1", +"MAX(DISTINCT a) AS EXPR$2") +) + +streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testDistinctAggregateWithGroupingOnSessionWindow(): Unit = { +val sqlQuery = "SELECT a, " + + " COUNT(a), " + + " SUM(DISTINCT c) " + + "FROM MyTable " + + "GROUP BY a, SESSION(rowtime, INTERVAL '15' MINUTE) " + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", +
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r185955967 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -641,7 +641,8 @@ class AggregationCodeGenerator( | java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | Object k = entry.getKey(); | Long v = (Long) entry.getValue(); - | if (aDistinctAcc$i.add(k, v)) { + | if (aDistinctAcc$i.add( --- End diff -- The key in the entry is a `Row` already ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186036107 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), --- End diff -- To check the correct merge behavior, we need two windows which aggregate the same value that is than deduplicated in merge. Some data like: ``` (1L, 2, "Hello"), // 1. Hello window (2L, 2, "Hello"), // 1. Hello window, deduped (8L, 2, "Hello"), // 2. Hello window, deduped during merge (10L, 3, "Hello"), // 2. Hello window, forwarded during merge (9L, 9, "Hello World"), // 1. Hello World window (4L, 1, "Hello"), // 1. Hello window, triggering merge of 1. and 2. Hello windows (16L, 16, "Hello")) // 3. Hello window (not merged) ``` ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186033401 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), --- End diff -- The test is not checking for DISTINCT semantics since all aggregated values are distinct. We could do `COUNT(DISTINCT num)` (`int` has to be renamed to `num` because its a SQL keyword). ---
[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover
[ https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463715#comment-16463715 ] ASF GitHub Bot commented on FLINK-8946: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5869 cc @zentol @GJL > TaskManager stop sending metrics after JobManager failover > -- > > Key: FLINK-8946 > URL: https://issues.apache.org/jira/browse/FLINK-8946 > Project: Flink > Issue Type: Bug > Components: Metrics, TaskManager >Affects Versions: 1.4.2 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Critical > Fix For: 1.5.0 > > > Running in Yarn-standalone mode, when the Job Manager performs a failover, > all TaskManager that are inherited from the previous JobManager will not send > metrics to the new JobManager and other registered metric reporters. > > A cursory glance reveal that these line of code might be the cause > [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086] > Perhap the TaskManager close its metrics group when disassociating > JobManager, but not creating a new one on fail-over association ? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5869 cc @zentol @GJL ---
[jira] [Created] (FLINK-9296) Support distinct aggregation on non-windowed grouped streaming tables
Fabian Hueske created FLINK-9296: Summary: Support distinct aggregation on non-windowed grouped streaming tables Key: FLINK-9296 URL: https://issues.apache.org/jira/browse/FLINK-9296 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Fabian Hueske -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9232) Add harness test for AggregationCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-9232: - Component/s: Table API & SQL > Add harness test for AggregationCodeGenerator > -- > > Key: FLINK-9232 > URL: https://issues.apache.org/jira/browse/FLINK-9232 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Instead of relying on ITCase to cover the codegen result. We should have > direct test against that, for example using Harness test framework. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8691) Update table API to support distinct operator on data stream
[ https://issues.apache.org/jira/browse/FLINK-8691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-8691: - Component/s: Table API & SQL > Update table API to support distinct operator on data stream > > > Key: FLINK-8691 > URL: https://issues.apache.org/jira/browse/FLINK-8691 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8739) Optimize runtime support for distinct filter
[ https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-8739: - Component/s: Table API & SQL > Optimize runtime support for distinct filter > > > Key: FLINK-8739 > URL: https://issues.apache.org/jira/browse/FLINK-8739 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Possible optimizaitons: > 1. Decouple distinct map and actual accumulator so that they can separately > be created in codegen. > 2. Reuse same distinct accumulator for filtering, e.g. `SELECT > COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-6335) Parse DISTINCT over grouped window in stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6335. Resolution: Duplicate Duplicate of FLINK-8690 > Parse DISTINCT over grouped window in stream SQL > > > Key: FLINK-6335 > URL: https://issues.apache.org/jira/browse/FLINK-6335 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Major > > The SQL on the batch side supports the {{DISTINCT}} keyword over aggregation. > This jira proposes to support the {{DISTINCT}} keyword on streaming > aggregation using the same technique on the batch side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-8690: - Summary: Support distinct aggregation on group windowed streaming tables. (was: Update logical rule set to generate FlinkLogicalAggregate explicitly allow distinct agg on DataStream) > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.
[ https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-8690: - Component/s: Table API & SQL > Support distinct aggregation on group windowed streaming tables. > > > Key: FLINK-8690 > URL: https://issues.apache.org/jira/browse/FLINK-8690 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not > allow distinct aggregate. > We are proposing to reuse distinct aggregate codegen work designed for > *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on > datastream as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-6373) Add runtime support for distinct aggregation over grouped windows
[ https://issues.apache.org/jira/browse/FLINK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6373. Resolution: Duplicate Was implemented as FLINK-8689. > Add runtime support for distinct aggregation over grouped windows > - > > Key: FLINK-6373 > URL: https://issues.apache.org/jira/browse/FLINK-6373 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Major > > This is a follow up task for FLINK-6335. FLINK-6335 enables parsing the > distinct aggregations over grouped windows. This jira tracks the effort of > adding runtime support for the query. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
[ https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463642#comment-16463642 ] ASF GitHub Bot commented on FLINK-7775: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5799 cc @zentol @GJL this PR takes a long time and the travis build error seems uncorrelated. can you review it? thanks. > Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs > --- > > Key: FLINK-7775 > URL: https://issues.apache.org/jira/browse/FLINK-7775 > Project: Flink > Issue Type: Task > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > public int getNumberOfCachedJobs() { > return jobRefCounters.size(); > } > {code} > The method of PermanentBlobCache is not used. > We should remove it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5799: [FLINK-7775] Remove unreferenced method PermanentBlobCach...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5799 cc @zentol @GJL this PR takes a long time and the travis build error seems uncorrelated. can you review it? thanks. ---
[GitHub] flink issue #5495: [FLINK-8659] Add migration itcases for broadcast state.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5495 I close this and I will open an updated one. ---
[GitHub] flink pull request #5495: [FLINK-8659] Add migration itcases for broadcast s...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/5495 ---
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463628#comment-16463628 ] ASF GitHub Bot commented on FLINK-8659: --- Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/5495 > Add migration tests for Broadcast state. > > > Key: FLINK-8659 > URL: https://issues.apache.org/jira/browse/FLINK-8659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5955 [FLINK-8659] Add migration itcases for broadcast state. As the name implies, this PR add migration tests for the newly introduced broadcast state. For the `scala` case, more refactoring is required so that the shared code between the tests is better distributed, but this is a broader refactoring. It requires the same work that was done for the previous case of the `java` migration tests. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink migration-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5955.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5955 commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e Author: kkloudasDate: 2018-05-03T08:05:13Z [FLINK-8659] Add migration itcases for broadcast state. ---
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463634#comment-16463634 ] ASF GitHub Bot commented on FLINK-8659: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5955 [FLINK-8659] Add migration itcases for broadcast state. As the name implies, this PR add migration tests for the newly introduced broadcast state. For the `scala` case, more refactoring is required so that the shared code between the tests is better distributed, but this is a broader refactoring. It requires the same work that was done for the previous case of the `java` migration tests. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink migration-inv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5955.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5955 commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e Author: kkloudasDate: 2018-05-03T08:05:13Z [FLINK-8659] Add migration itcases for broadcast state. > Add migration tests for Broadcast state. > > > Key: FLINK-8659 > URL: https://issues.apache.org/jira/browse/FLINK-8659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463630#comment-16463630 ] ASF GitHub Bot commented on FLINK-8659: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5495 I close this and I will open an updated one. > Add migration tests for Broadcast state. > > > Key: FLINK-8659 > URL: https://issues.apache.org/jira/browse/FLINK-8659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults
[ https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-9245: - reopen to remove fixVersion > Can't create a BucketingSink with a provided Configuration if no hadoop > defaults > > > Key: FLINK-9245 > URL: https://issues.apache.org/jira/browse/FLINK-9245 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.2 >Reporter: Julien Cuquemelle >Priority: Minor > > We build Integration tests using this kind of code: > {code:java} > val bucketingSink = new > BucketingSink[Row]("hdfs:///user/$USER)}/application_name/") > bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs) > bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm")) > outpuStream.addSink(bucketingSink) > {code} > Here, the hadoopRule is providing a valid hdfs config that should allows this > kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set > up, like a developper workstation or a Jenkins slave. > When running this code on such a machine, the .createHadoopFileSystem(...) > fails with > {noformat} > The given file system URI (hdfs:///user/$USER/application_name/) did not > describe the authority > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat} > because it tries to instantiate the fileSystem from a default configuration > in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default > filesystem resolves to "file:///" and the checks of the consistency of the > URI fails because no authority can be found So the whole filesystem creation > fails before actually trying to use the provided config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults
[ https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-9245. --- Resolution: Not A Problem > Can't create a BucketingSink with a provided Configuration if no hadoop > defaults > > > Key: FLINK-9245 > URL: https://issues.apache.org/jira/browse/FLINK-9245 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.2 >Reporter: Julien Cuquemelle >Priority: Minor > > We build Integration tests using this kind of code: > {code:java} > val bucketingSink = new > BucketingSink[Row]("hdfs:///user/$USER)}/application_name/") > bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs) > bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm")) > outpuStream.addSink(bucketingSink) > {code} > Here, the hadoopRule is providing a valid hdfs config that should allows this > kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set > up, like a developper workstation or a Jenkins slave. > When running this code on such a machine, the .createHadoopFileSystem(...) > fails with > {noformat} > The given file system URI (hdfs:///user/$USER/application_name/) did not > describe the authority > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat} > because it tries to instantiate the fileSystem from a default configuration > in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default > filesystem resolves to "file:///" and the checks of the consistency of the > URI fails because no authority can be found So the whole filesystem creation > fails before actually trying to use the provided config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults
[ https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-9245: Fix Version/s: (was: 1.6.0) > Can't create a BucketingSink with a provided Configuration if no hadoop > defaults > > > Key: FLINK-9245 > URL: https://issues.apache.org/jira/browse/FLINK-9245 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.2 >Reporter: Julien Cuquemelle >Priority: Minor > > We build Integration tests using this kind of code: > {code:java} > val bucketingSink = new > BucketingSink[Row]("hdfs:///user/$USER)}/application_name/") > bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs) > bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm")) > outpuStream.addSink(bucketingSink) > {code} > Here, the hadoopRule is providing a valid hdfs config that should allows this > kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set > up, like a developper workstation or a Jenkins slave. > When running this code on such a machine, the .createHadoopFileSystem(...) > fails with > {noformat} > The given file system URI (hdfs:///user/$USER/application_name/) did not > describe the authority > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat} > because it tries to instantiate the fileSystem from a default configuration > in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default > filesystem resolves to "file:///" and the checks of the consistency of the > URI fails because no authority can be found So the whole filesystem creation > fails before actually trying to use the provided config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults
[ https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-9245. --- Resolution: Not A Problem Closing now but please reopen if specifying an authority (host) doesn't fix it. > Can't create a BucketingSink with a provided Configuration if no hadoop > defaults > > > Key: FLINK-9245 > URL: https://issues.apache.org/jira/browse/FLINK-9245 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.2 >Reporter: Julien Cuquemelle >Priority: Minor > Fix For: 1.6.0 > > > We build Integration tests using this kind of code: > {code:java} > val bucketingSink = new > BucketingSink[Row]("hdfs:///user/$USER)}/application_name/") > bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs) > bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm")) > outpuStream.addSink(bucketingSink) > {code} > Here, the hadoopRule is providing a valid hdfs config that should allows this > kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set > up, like a developper workstation or a Jenkins slave. > When running this code on such a machine, the .createHadoopFileSystem(...) > fails with > {noformat} > The given file system URI (hdfs:///user/$USER/application_name/) did not > describe the authority > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat} > because it tries to instantiate the fileSystem from a default configuration > in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default > filesystem resolves to "file:///" and the checks of the consistency of the > URI fails because no authority can be found So the whole filesystem creation > fails before actually trying to use the provided config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463605#comment-16463605 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on the issue: https://github.com/apache/flink/pull/5939 I'm closing this Pull Request and will make a new one with the newer approach as discussed in the jira issue > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463606#comment-16463606 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing closed the pull request at: https://github.com/apache/flink/pull/5939 > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5939: [FLINK-8500] [Kafka Connector] Get the timestamp o...
Github user FredTing closed the pull request at: https://github.com/apache/flink/pull/5939 ---
[GitHub] flink issue #5939: [FLINK-8500] [Kafka Connector] Get the timestamp of the K...
Github user FredTing commented on the issue: https://github.com/apache/flink/pull/5939 I'm closing this Pull Request and will make a new one with the newer approach as discussed in the jira issue ---
[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463583#comment-16463583 ] ASF GitHub Bot commented on FLINK-6206: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5399 @casidiablo yes that is correct. > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5399: [FLINK-6206] [runtime] Use LOG.error() when logging failu...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5399 @casidiablo yes that is correct. ---
[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails
[ https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463579#comment-16463579 ] ASF GitHub Bot commented on FLINK-9276: --- GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5954 [FLINK-9276] Improve error message when TaskManager fails ## What is the purpose of the change *This pull request improves error message when TaskManager fails* ## Brief change log - *add a Exception param from method `SlotPoolGateway#releaseTaskManager`* - *refactor the usage of method `SlotPoolGateway#releaseTaskManager`* ## Verifying this change This change is already covered by existing tests, such as *SchedulerTestBase/SchedulerIsolatedTasks*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9276 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5954.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5954 commit 904f762e6b495ea6b4ffd445b9168f001f74be26 Author: yanghuaDate: 2018-05-04T08:52:38Z [FLINK-9276] Improve error message when TaskManager fails > Improve error message when TaskManager fails > > > Key: FLINK-9276 > URL: https://issues.apache.org/jira/browse/FLINK-9276 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Critical > > When a TaskManager fails, we frequently get a message > {code} > org.apache.flink.util.FlinkException: Releasing TaskManager > container_1524853016208_0001_01_000102 > {code} > This message is misleading in that it sounds like an intended operation, when > it really is a failure of a container that the {{ResourceManager}} reports to > the {{JobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5954 [FLINK-9276] Improve error message when TaskManager fails ## What is the purpose of the change *This pull request improves error message when TaskManager fails* ## Brief change log - *add a Exception param from method `SlotPoolGateway#releaseTaskManager`* - *refactor the usage of method `SlotPoolGateway#releaseTaskManager`* ## Verifying this change This change is already covered by existing tests, such as *SchedulerTestBase/SchedulerIsolatedTasks*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9276 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5954.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5954 commit 904f762e6b495ea6b4ffd445b9168f001f74be26 Author: yanghuaDate: 2018-05-04T08:52:38Z [FLINK-9276] Improve error message when TaskManager fails ---
[jira] [Closed] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-8620. --- Resolution: Fixed Fix Version/s: 1.6.0 > Enable shipping custom artifacts to BlobStore and accessing them through > DistributedCache > - > > Key: FLINK-8620 > URL: https://issues.apache.org/jira/browse/FLINK-8620 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.6.0 > > > We should be able to distribute custom files to taskmanagers. To do that we > can store those files in BlobStore and later on access them in TaskManagers > through DistributedCache. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-7151: - Assignee: Shuyi Chen (was: yuemeng) > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: Shuyi Chen >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-9169: --- Priority: Blocker (was: Major) > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9292) Remove TypeInfoParser
[ https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463542#comment-16463542 ] vinoyang commented on FLINK-9292: - [~aljoscha] and [~StephanEwen] can we remove the specific method's implementation and throw a specific exception? > Remove TypeInfoParser > - > > Key: FLINK-9292 > URL: https://issues.apache.org/jira/browse/FLINK-9292 > Project: Flink > Issue Type: Task > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}. > Because the TypeInfoParser is also not working correctly with respect to > classloading, we should remove it. Users still find the class, try to use it, > and run into problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9292) Remove TypeInfoParser
[ https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463534#comment-16463534 ] vinoyang commented on FLINK-9292: - [~aljoscha] based on [~StephanEwen]'s description. the TypeInfoParser is not working correctly. so what's the way of handling? > Remove TypeInfoParser > - > > Key: FLINK-9292 > URL: https://issues.apache.org/jira/browse/FLINK-9292 > Project: Flink > Issue Type: Task > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}. > Because the TypeInfoParser is also not working correctly with respect to > classloading, we should remove it. Users still find the class, try to use it, > and run into problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9269) Concurrency problem in HeapKeyedStateBackend when performing checkpoint async
[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-9269. - Resolution: Fixed Merged in: master: 14e7d35f26 release-1.5: 3ba21adc0e > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > - > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Critical > Fix For: 1.6.0 > > > {code:java} > @Test > public void testConccurrencyProblem() throws Exception { > CheckpointStreamFactory streamFactory = createStreamFactory(); > Environment env = new DummyEnvironment(); > AbstractKeyedStateBackend backend = > createKeyedBackend(IntSerializer.INSTANCE, env); > try { > long checkpointID = 0; > List futureList = new ArrayList(); > for (int i = 0; i < 10; ++i) { > ValueStateDescriptor kvId = new > ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE); > ValueState state = > backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); > ((InternalValueState) > state).setCurrentNamespace(VoidNamespace.INSTANCE); > backend.setCurrentKey(i); > state.update(i); > > futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, > System.currentTimeMillis(), streamFactory, > CheckpointOptions.forCheckpointWithDefaultLocation(; > } > for (Future future : futureList) { > future.get(); > } > } catch (Exception e) { > fail(); > } finally { > backend.dispose(); > } > } > protected Future runSnapshotAsync( > RunnableFuture> snapshotRunnableFuture) throws Exception { > if (!snapshotRunnableFuture.isDone()) { > return Executors.newFixedThreadPool(5).submit(() -> { > try { > snapshotRunnableFuture.run(); > snapshotRunnableFuture.get(); > } catch (Exception e) { > e.printStackTrace(); > fail(); > } > }); > } > return null; > } > {code} > Place the above code in `StateBackendTestBase` and run > `AsyncMemoryStateBackendTest`, it will get the follows exception > {code} > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) > at > org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) > ... 5 more > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at >
[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463522#comment-16463522 ] ASF GitHub Bot commented on FLINK-8620: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5580 > Enable shipping custom artifacts to BlobStore and accessing them through > DistributedCache > - > > Key: FLINK-8620 > URL: https://issues.apache.org/jira/browse/FLINK-8620 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > > We should be able to distribute custom files to taskmanagers. To do that we > can store those files in BlobStore and later on access them in TaskManagers > through DistributedCache. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9292) Remove TypeInfoParser
[ https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463521#comment-16463521 ] Aljoscha Krettek commented on FLINK-9292: - [~yanghua] Keep in mind, that we can't remove this now as it's marked as {{@Public}}. > Remove TypeInfoParser > - > > Key: FLINK-9292 > URL: https://issues.apache.org/jira/browse/FLINK-9292 > Project: Flink > Issue Type: Task > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}. > Because the TypeInfoParser is also not working correctly with respect to > classloading, we should remove it. Users still find the class, try to use it, > and run into problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5580 ---