[jira] [Commented] (FLINK-8020) Deadlock found in Async I/O operator
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349926#comment-16349926 ] Stephan Ewen commented on FLINK-8020: - I think the original diagnosis is not quite correct - The thread locks {{0x0007b692ad98}} (which is the checkpoint lock) and then waits in the lock (as in {{Object.wait()}}), which means it waits for a notification. The thread is not blocked on the lock (note that Java Thread state WAITING is not BLOCKING). I am curious if this is still a problem? The stack trace actually does not show a specific problem - at a first glance, it looks as if Async I/O operations (to HBase?) do not complete. Because of that, the pipeline stops and waits for those async i/o requests.waits > Deadlock found in Async I/O operator > > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: jstack53009(2).out, jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...
GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/5400 [FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for exactly-once ## What is the purpose of the change *Currently in exactly-once mode, the BarrierBuffer would block inputs with barriers until all inputs have received the barrier for a given checkpoint. To avoid back-pressuring the input streams which may cause distributed deadlocks, the BarrierBuffer has to spill the data in disk files to recycle the buffers for blocked channels.* *Based on credit-based flow control, every channel has exclusive buffers, so it is no need to spill data for avoiding deadlock. Then we implement a new CheckpointBarrierHandler for only buffering the data for blocked channels for better performance.* *And this new CheckpointBarrierHandler can also be configured to use or not in order to rollback the original mode for unexpected risks.* ## Brief change log - *Implement the new `CreditBasedBarrierBuffer` and `CreditBasedBufferBlocker` for buffering data in blocked channels in exactly-once mode.* - *Define the parameter `taskmanager.exactly-once.blocking.data.enabled` for enabling the new handler or not.* ## Verifying this change This change added tests and can be verified as follows: - *Added tests for the logic of `CreditBasedBarrierBuffer`* - *Added tests for the logic of `CreditBasedBufferBlocker`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-8547 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5400.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5400 commit 4d08e5d58c732e8f835016b48edc4494f8cb26fe Author: ZhijiangDate: 2018-02-02T07:45:49Z [FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for exactly-once ---
[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349921#comment-16349921 ] ASF GitHub Bot commented on FLINK-8547: --- GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/5400 [FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for exactly-once ## What is the purpose of the change *Currently in exactly-once mode, the BarrierBuffer would block inputs with barriers until all inputs have received the barrier for a given checkpoint. To avoid back-pressuring the input streams which may cause distributed deadlocks, the BarrierBuffer has to spill the data in disk files to recycle the buffers for blocked channels.* *Based on credit-based flow control, every channel has exclusive buffers, so it is no need to spill data for avoiding deadlock. Then we implement a new CheckpointBarrierHandler for only buffering the data for blocked channels for better performance.* *And this new CheckpointBarrierHandler can also be configured to use or not in order to rollback the original mode for unexpected risks.* ## Brief change log - *Implement the new `CreditBasedBarrierBuffer` and `CreditBasedBufferBlocker` for buffering data in blocked channels in exactly-once mode.* - *Define the parameter `taskmanager.exactly-once.blocking.data.enabled` for enabling the new handler or not.* ## Verifying this change This change added tests and can be verified as follows: - *Added tests for the logic of `CreditBasedBarrierBuffer`* - *Added tests for the logic of `CreditBasedBufferBlocker`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-8547 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5400.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5400 commit 4d08e5d58c732e8f835016b48edc4494f8cb26fe Author: ZhijiangDate: 2018-02-02T07:45:49Z [FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data for exactly-once > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
[ https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-8547: Description: Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with barriers until all inputs have received the barrier for a given checkpoint. To avoid back-pressuring the input streams which may cause distributed deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to recycle the buffers for blocked channels. Based on credit-based flow control, every channel has exclusive buffers, so it is no need to spill data for avoiding deadlock. Then we implement a new {{CheckpointBarrierHandler}} for only buffering the data for blocked channels for better performance. And this new {{CheckpointBarrierHandler}} can also be configured to use or not in order to rollback the original mode for unexpected risks. was: Currently in exactly-once mode, the `BarrierBuffer` would block inputs with barriers until all inputs have received the barrier for a given checkpoint. To avoid back-pressuring the input streams which may cause distributed deadlocks, the `BarrierBuffer` has to spill the data in disk files to recycle the buffers for blocked channels. Based on credit-based flow control, every channel has exclusive buffers, so it is no need to spill data for avoiding deadlock. Then we implement a new 'CheckpointBarrierHandler` for only buffering the data for blocked channels for better performance. And this new `CheckpointBarrierHandler` can also be configured to use or not in order to rollback the original mode for unexpected risks. > Implement CheckpointBarrierHandler not to spill data for exactly-once based > on credit-based flow control > > > Key: FLINK-8547 > URL: https://issues.apache.org/jira/browse/FLINK-8547 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with > barriers until all inputs have received the barrier for a given checkpoint. > To avoid back-pressuring the input streams which may cause distributed > deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to > recycle the buffers for blocked channels. > > Based on credit-based flow control, every channel has exclusive buffers, so > it is no need to spill data for avoiding deadlock. Then we implement a new > {{CheckpointBarrierHandler}} for only buffering the data for blocked channels > for better performance. > > And this new {{CheckpointBarrierHandler}} can also be configured to use or > not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control
zhijiang created FLINK-8547: --- Summary: Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control Key: FLINK-8547 URL: https://issues.apache.org/jira/browse/FLINK-8547 Project: Flink Issue Type: Sub-task Components: Network Affects Versions: 1.5.0 Reporter: zhijiang Assignee: zhijiang Currently in exactly-once mode, the `BarrierBuffer` would block inputs with barriers until all inputs have received the barrier for a given checkpoint. To avoid back-pressuring the input streams which may cause distributed deadlocks, the `BarrierBuffer` has to spill the data in disk files to recycle the buffers for blocked channels. Based on credit-based flow control, every channel has exclusive buffers, so it is no need to spill data for avoiding deadlock. Then we implement a new 'CheckpointBarrierHandler` for only buffering the data for blocked channels for better performance. And this new `CheckpointBarrierHandler` can also be configured to use or not in order to rollback the original mode for unexpected risks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8020) Deadlock found in Async I/O operator
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349913#comment-16349913 ] Stephan Ewen commented on FLINK-8020: - Updated the title to better describe the root problem. > Deadlock found in Async I/O operator > > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: jstack53009(2).out, jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8020) Deadlock found in Async I/O operator
[ https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-8020: Summary: Deadlock found in Async I/O operator (was: Deadlock found in Flink Streaming job) > Deadlock found in Async I/O operator > > > Key: FLINK-8020 > URL: https://issues.apache.org/jira/browse/FLINK-8020 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming, Streaming Connectors >Affects Versions: 1.3.2 > Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode >Reporter: Weihua Jiang >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: jstack53009(2).out, jstack67976-2.log > > > Our streaming job run into trouble in these days after a long time smooth > running. One issue we found is > [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this > one. > After analyzing the jstack, we believe we found a DEAD LOCK in flink: > 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" > hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940. > 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: > hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock > 0x0007b6aa1788. > This DEADLOCK made the job fail to proceed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6763) Inefficient PojoSerializerConfigSnapshot serialization format
[ https://issues.apache.org/jira/browse/FLINK-6763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349909#comment-16349909 ] Stephan Ewen commented on FLINK-6763: - As a related comment - I think the whole snapshot procedure can be optimized a bit. We can create the serializer snapshot one and then just keep the bytes and add those to every checkpoint. In smaller state programs, the majority of checkpoint time can be spent on serializer snapshots (still only, milliseconds, but optimization potential non the less) > Inefficient PojoSerializerConfigSnapshot serialization format > - > > Key: FLINK-6763 > URL: https://issues.apache.org/jira/browse/FLINK-6763 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Type Serialization System >Affects Versions: 1.3.0, 1.4.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > The {{PojoSerializerConfigSnapshot}} stores for each serializer the beginning > offset and ending offset in the serialization stream. This information is > also written if the serializer serialization is supposed to be ignored. The > beginning and ending offsets are stored as a sequence of integers at the > beginning of the serialization stream. We store this information to skip > broken serializers. > I think we don't need both offsets. Instead I would suggest to write the > length of the serialized serializer first into the serialization stream and > then the serialized serializer. This can be done in > {{TypeSerializerSerializationUtil.writeSerializer}}. When reading the > serializer via {{TypeSerializerSerializationUtil.tryReadSerializer}}, we can > try to deserialize the serializer. If this operation fails, then we can skip > the number of serialized serializer because we know how long it was. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore
[ https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349905#comment-16349905 ] ASF GitHub Bot commented on FLINK-8421: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5362#discussion_r165573730 --- Diff: flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.InputStreamViewWrapper; + +import java.io.IOException; +import java.io.PushbackInputStream; +import java.util.Arrays; + +/** + * A {@link VersionedIOReadableWritable} which allows to differentiate whether the previous + * data was versioned with a {@link VersionedIOReadableWritable}. This can be used if previously + * written data was not versioned, and is to be migrated to a versioned format. + */ +@Internal +public abstract class PostVersionedIOReadableWritable extends VersionedIOReadableWritable { + + /** NOTE: CANNOT CHANGE! */ + private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, -51, -123, -97}; + + /** +* Read from the provided {@link DataInputView in}. A flag {@code wasVersioned} can be +* used to determine whether or not the data to read was previously written +* by a {@link VersionedIOReadableWritable}. +*/ + protected abstract void read(DataInputView in, boolean wasVersioned) throws IOException; + + @Override + public void write(DataOutputView out) throws IOException { + out.write(VERSIONED_IDENTIFIER); + super.write(out); + } + + /** +* This read attempts to first identify if the input view contains the special +* {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes. +* If identified to be versioned, the usual version resolution read path +* in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked. +* Otherwise, we "reset" the input view by pushing back the read buffered bytes +* into the stream. +*/ + @Override + public final void read(DataInputView in) throws IOException { + PushbackInputStream stream = new PushbackInputStream(new InputStreamViewWrapper(in), VERSIONED_IDENTIFIER.length); + + byte[] tmp = new byte[VERSIONED_IDENTIFIER.length]; + stream.read(tmp); + + if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) { + super.read(in); + read(in, true); + } else { + stream.unread(tmp); + read(new DataInputViewStreamWrapper(stream), false); + } --- End diff -- Not-so-nice things about this current implementation is: 1) it requires several layers of transforming back and forth between a `DataInputView` and `InputStream`, and 2) it uses a separate `read(DataInputView, boolean)` method in order to wrap a "reset" `DataInputView` for the remaining reads. I think the implementation would have been much more elegant if `DataInputView` has an `unread(byte[])` method, though I'm not sure how non-trivial it is to support this across all subclasses. Maybe a food for thought for the future .. > HeapInternalTimerService should reconfigure compatible key / namespace > serializers on restore > - > > Key: FLINK-8421 > URL: https://issues.apache.org/jira/browse/FLINK-8421 > Project:
[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5362#discussion_r165573730 --- Diff: flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.InputStreamViewWrapper; + +import java.io.IOException; +import java.io.PushbackInputStream; +import java.util.Arrays; + +/** + * A {@link VersionedIOReadableWritable} which allows to differentiate whether the previous + * data was versioned with a {@link VersionedIOReadableWritable}. This can be used if previously + * written data was not versioned, and is to be migrated to a versioned format. + */ +@Internal +public abstract class PostVersionedIOReadableWritable extends VersionedIOReadableWritable { + + /** NOTE: CANNOT CHANGE! */ + private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, -51, -123, -97}; + + /** +* Read from the provided {@link DataInputView in}. A flag {@code wasVersioned} can be +* used to determine whether or not the data to read was previously written +* by a {@link VersionedIOReadableWritable}. +*/ + protected abstract void read(DataInputView in, boolean wasVersioned) throws IOException; + + @Override + public void write(DataOutputView out) throws IOException { + out.write(VERSIONED_IDENTIFIER); + super.write(out); + } + + /** +* This read attempts to first identify if the input view contains the special +* {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes. +* If identified to be versioned, the usual version resolution read path +* in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked. +* Otherwise, we "reset" the input view by pushing back the read buffered bytes +* into the stream. +*/ + @Override + public final void read(DataInputView in) throws IOException { + PushbackInputStream stream = new PushbackInputStream(new InputStreamViewWrapper(in), VERSIONED_IDENTIFIER.length); + + byte[] tmp = new byte[VERSIONED_IDENTIFIER.length]; + stream.read(tmp); + + if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) { + super.read(in); + read(in, true); + } else { + stream.unread(tmp); + read(new DataInputViewStreamWrapper(stream), false); + } --- End diff -- Not-so-nice things about this current implementation is: 1) it requires several layers of transforming back and forth between a `DataInputView` and `InputStream`, and 2) it uses a separate `read(DataInputView, boolean)` method in order to wrap a "reset" `DataInputView` for the remaining reads. I think the implementation would have been much more elegant if `DataInputView` has an `unread(byte[])` method, though I'm not sure how non-trivial it is to support this across all subclasses. Maybe a food for thought for the future .. ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349869#comment-16349869 ] ASF GitHub Bot commented on FLINK-8516: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai the PR is ready for review now > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai the PR is ready for review now ---
[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios
[ https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349853#comment-16349853 ] ASF GitHub Bot commented on FLINK-6206: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5399 Could you modify the title to `[FLINK-6206] [runtime] Use LOG.error() when logging failure state changes`? THere's actually a JIRA ticket that covers this change. > Log task state transitions as warn/error for FAILURE scenarios > -- > > Key: FLINK-6206 > URL: https://issues.apache.org/jira/browse/FLINK-6206 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.2.0 >Reporter: Dan Bress >Priority: Critical > > If a task fails due to an exception, I would like that to be logged at a warn > or an error level. currently its info > {code} > private boolean transitionState(ExecutionState currentState, ExecutionState > newState, Throwable cause) { > if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { > if (cause == null) { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState); > } else { > LOG.info("{} ({}) switched from {} to {}.", > taskNameWithSubtask, executionId, currentState, newState, cause); > } > return true; > } else { > return false; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5399: [hotfix] Use LOG.error() when logging failure state chang...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5399 Could you modify the title to `[FLINK-6206] [runtime] Use LOG.error() when logging failure state changes`? THere's actually a JIRA ticket that covers this change. ---
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349806#comment-16349806 ] ASF GitHub Bot commented on FLINK-7923: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165560261 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class CalcTest extends TableTestBase { + + @Test + def testArrayElement(): Unit = { +val util = streamTestUtil() +util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b) + +val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", +"a", +"DOT(ITEM(b, 1), '_1') AS b11" --- End diff -- Will fix it. > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165560261 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class CalcTest extends TableTestBase { + + @Test + def testArrayElement(): Unit = { +val util = streamTestUtil() +util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b) + +val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", +"a", +"DOT(ITEM(b, 1), '_1') AS b11" --- End diff -- Will fix it. ---
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349805#comment-16349805 ] ASF GitHub Bot commented on FLINK-7923: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165560235 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- But I agree I should refactor the code in visitFieldAccess() to reuse it as much as possible. > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165560235 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- But I agree I should refactor the code in visitFieldAccess() to reuse it as much as possible. ---
[GitHub] flink pull request #5399: [hotfix] Use LOG.error() when logging failure stat...
GitHub user casidiablo opened a pull request: https://github.com/apache/flink/pull/5399 [hotfix] Use LOG.error() when logging failure state changes It's very inconvenient to have these logged with `INFO`. It makes it hard to detect errors when inspecting logs with, say, Kibana. You can merge this pull request into a Git repository by running: $ git pull https://github.com/casidiablo/flink use-error-log Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5399.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5399 commit 62160ed6e3ab9c0336bdb8869b59d38a2aab30ee Author: CristianDate: 2018-02-02T03:48:45Z Use LOG.error() when logging failure state changes ---
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724 ] Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:30 AM: Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the {{DataStream}} to {{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we need some refactors to adapt this. I will think more about it. Thanks. was (Author: hequn8128): Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the {{DataStream}} to {{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724 ] Hequn Cheng commented on FLINK-8545: Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the \{{DataStream}} to \{{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724 ] Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:29 AM: Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the {{DataStream}} to {{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. was (Author: hequn8128): Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the \{{DataStream}} to \{{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349559#comment-16349559 ] ASF GitHub Bot commented on FLINK-7923: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165527528 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- In Calcite, once it sees array element access, the subsequent field access is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a custom handling for the DOT RexCall. > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165527528 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -984,6 +987,63 @@ object ScalarOperators { } } + def generateDot(codeGenerator: CodeGenerator, --- End diff -- In Calcite, once it sees array element access, the subsequent field access is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a custom handling for the DOT RexCall. ---
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349458#comment-16349458 ] ASF GitHub Bot commented on FLINK-7923: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5367 @hequn8128 , thanks for the review. Where do you think we can add it in sql.md? > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349454#comment-16349454 ] ASF GitHub Bot commented on FLINK-7923: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165515039 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testArrayElementAtFromTableForTuple(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val data = List( + (1, Array((12, 45), (2, 5))), --- End diff -- Added null check. for nested tuple input, it wont work for now due to https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, should be available in Calcite 1.16. > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Shuyi Chen >Priority: Major > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...
Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5367 @hequn8128 , thanks for the review. Where do you think we can add it in sql.md? ---
[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5367#discussion_r165515039 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testArrayElementAtFromTableForTuple(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val data = List( + (1, Array((12, 45), (2, 5))), --- End diff -- Added null check. for nested tuple input, it wont work for now due to https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, should be available in Calcite 1.16. ---
[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4
[ https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349131#comment-16349131 ] ASF GitHub Bot commented on FLINK-8472: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5364#discussion_r165459894 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java --- @@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { --- End diff -- yeah it should be alright to remove that, but let's chain the uid call to the operator creation as we do for the others for style points. > Extend migration tests for Flink 1.4 > > > Key: FLINK-8472 > URL: https://issues.apache.org/jira/browse/FLINK-8472 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > The following migration tests should be extended to cover migrating Flink > jobs with a 1.4 savepoint. > * {{WindowOperatorMigrationTest}} > * {{CEPMigrationTest}} > * {{StatefulJobSavepointMigrationTestITCase}} > * {{FlinkKinesisConsumerMigrationTest}} > * {{FlinkKafkaConsumerBaseMigrationTest}} > * {{ContinuousFileProcessingMigrationTest}} > * {{BucketingSinkMigrationTest}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5364: [FLINK-8472] [test] Extend all migration tests for...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5364#discussion_r165459894 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java --- @@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { --- End diff -- yeah it should be alright to remove that, but let's chain the uid call to the operator creation as we do for the others for style points. ---
[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks
[ https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349100#comment-16349100 ] ASF GitHub Bot commented on FLINK-8516: --- GitHub user tweise reopened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer ## What is the purpose of the change Allow the user to customize Kinesis shard to subtask assignment in the Kinesis consumer. ## Brief change log Added pluggable shard assigner. ## Verifying this change Added unit test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation Javadoc You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8516.shardHashing Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5393.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5393 commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369 Author: Thomas WeiseDate: 2018-01-31T01:44:44Z [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer > FlinkKinesisConsumer does not balance shards over subtasks > -- > > Key: FLINK-8516 > URL: https://issues.apache.org/jira/browse/FLINK-8516 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > > The hash code of the shard is used to distribute discovered shards over > subtasks round robin. This works as long as shard identifiers are sequential. > After shards are rebalanced in Kinesis, that may no longer be the case and > the distribution become skewed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
GitHub user tweise reopened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer ## What is the purpose of the change Allow the user to customize Kinesis shard to subtask assignment in the Kinesis consumer. ## Brief change log Added pluggable shard assigner. ## Verifying this change Added unit test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation Javadoc You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8516.shardHashing Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5393.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5393 commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369 Author: Thomas WeiseDate: 2018-01-31T01:44:44Z [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer ---
[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore
[ https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348936#comment-16348936 ] ASF GitHub Bot commented on FLINK-8421: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5362 @aljoscha As discussed offline, I've: - replaced `ByteArrayPrependedInputStream` with Java's `PushbackInputStream` - use negative values in `VERSIONED_IDENTIFIER` to be extra safe > HeapInternalTimerService should reconfigure compatible key / namespace > serializers on restore > - > > Key: FLINK-8421 > URL: https://issues.apache.org/jira/browse/FLINK-8421 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > The {{HeapInternalTimerService}} still uses simple {{equals}} checks on > restored / newly provided serializers for compatibility checks. This should > be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, > so that new serializers can be reconfigured. > This would entail that the {{TypeSerializerConfiguration}} of the key and > namespace serializer in the {{HeapInternalTimerService}} also needs to be > written to the raw state. > For Flink 1.4.0 release and current master, this is a critical bug since the > {{KryoSerializer}} has different default base registrations than before due > to FLINK-7420. i.e if the key of a window is serialized using the > {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0. > For 1.3.x, this fix would be an improvement, such that the > {{HeapInternalTimerService}} restore will make use of serializer > reconfiguration. > Other remarks: > * We need to double check all operators that checkpoint / restore from > **raw** state. Apparently, the serializer compatibility checks were only > implemented for managed state. > * Migration ITCases apparently do not have enough coverage. A migration test > job that uses a key type which required the {{KryoSerializer}}, and uses > windows, would have caught this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5362: [FLINK-8421] [DataStream] Make timer serializers reconfig...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5362 @aljoscha As discussed offline, I've: - replaced `ByteArrayPrependedInputStream` with Java's `PushbackInputStream` - use negative values in `VERSIONED_IDENTIFIER` to be extra safe ---
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348895#comment-16348895 ] ASF GitHub Bot commented on FLINK-8308: --- Github user StevenLangbroek commented on the issue: https://github.com/apache/flink/pull/5395 @alpinegizmo hawkins is back. > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user StevenLangbroek commented on the issue: https://github.com/apache/flink/pull/5395 @alpinegizmo hawkins is back. ---
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348869#comment-16348869 ] ASF GitHub Bot commented on FLINK-8308: --- Github user StevenLangbroek commented on the issue: https://github.com/apache/flink/pull/5395 @alpinegizmo if we can't upgrade Jekyll we can't do what the ticket set out to do: get rid of yajl-ruby. > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user StevenLangbroek commented on the issue: https://github.com/apache/flink/pull/5395 @alpinegizmo if we can't upgrade Jekyll we can't do what the ticket set out to do: get rid of yajl-ruby. ---
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348859#comment-16348859 ] ASF GitHub Bot commented on FLINK-8308: --- Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 These lines need to be restored to the Gemfile. The hawkins plugin is needed for the incremental build and live reload feature group :jekyll_plugins do gem 'hawkins' end The bundled version of jekyll (3.7.2) requires ruby >= 2.1, but our build machines use ruby 2.0. If we can't get the apache INFRA team to upgrade ruby, we'll have to rework this. The Gemfile.lock doesn't work with ruby 2.1 -- I get this error ruby_dep-1.5.0 requires ruby version >= 2.2.5, which is incompatible with the current version, ruby 2.1.10p492 but re-bundling fixes this. I think we should commit a Gemfile.lock file that is compatible back to Ruby 2.1, if we determine we can get ruby 2.1 -- otherwise we'll have to roll back jekyll and then re-bundle with ruby 2.0. This PR works fine on ruby 2.3 and 2.4. The latest stable release of rvm doesn't yet support ruby 2.5, so I didn't test it. > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 These lines need to be restored to the Gemfile. The hawkins plugin is needed for the incremental build and live reload feature group :jekyll_plugins do gem 'hawkins' end The bundled version of jekyll (3.7.2) requires ruby >= 2.1, but our build machines use ruby 2.0. If we can't get the apache INFRA team to upgrade ruby, we'll have to rework this. The Gemfile.lock doesn't work with ruby 2.1 -- I get this error ruby_dep-1.5.0 requires ruby version >= 2.2.5, which is incompatible with the current version, ruby 2.1.10p492 but re-bundling fixes this. I think we should commit a Gemfile.lock file that is compatible back to Ruby 2.1, if we determine we can get ruby 2.1 -- otherwise we'll have to roll back jekyll and then re-bundle with ruby 2.0. This PR works fine on ruby 2.3 and 2.4. The latest stable release of rvm doesn't yet support ruby 2.5, so I didn't test it. ---
[jira] [Closed] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5820. --- > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ..//job1-id/ > /shared/<-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ..//job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ..//savepoint-1/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > /savepoint-2/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > {code} > This is the umbrella issue for the individual steps needed to address this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5820. - Resolution: Fixed Fixed by resolution of all subtasks > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ..//job1-id/ > /shared/<-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ..//job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ..//savepoint-1/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > /savepoint-2/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > {code} > This is the umbrella issue for the individual steps needed to address this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4
[ https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348831#comment-16348831 ] ASF GitHub Bot commented on FLINK-8472: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5364 Looks good but let's wait what @zentol has to say. > Extend migration tests for Flink 1.4 > > > Key: FLINK-8472 > URL: https://issues.apache.org/jira/browse/FLINK-8472 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > The following migration tests should be extended to cover migrating Flink > jobs with a 1.4 savepoint. > * {{WindowOperatorMigrationTest}} > * {{CEPMigrationTest}} > * {{StatefulJobSavepointMigrationTestITCase}} > * {{FlinkKinesisConsumerMigrationTest}} > * {{FlinkKafkaConsumerBaseMigrationTest}} > * {{ContinuousFileProcessingMigrationTest}} > * {{BucketingSinkMigrationTest}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5364: [FLINK-8472] [test] Extend all migration tests for Flink ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5364 Looks good but let's wait what @zentol has to say. ---
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348805#comment-16348805 ] ASF GitHub Bot commented on FLINK-5820: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5396 Merged in 31e97e57ceeaf37264ab6db078552b73ee5121bf > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ..//job1-id/ > /shared/<-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ..//job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ..//savepoint-1/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > /savepoint-2/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > {code} > This is the umbrella issue for the individual steps needed to address this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks
[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348806#comment-16348806 ] ASF GitHub Bot commented on FLINK-5820: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5396 > Extend State Backend Abstraction to support Global Cleanup Hooks > > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ..//job1-id/ > /shared/<-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ..//job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ..//savepoint-1/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > /savepoint-2/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > {code} > This is the umbrella issue for the individual steps needed to address this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5396 ---
[GitHub] flink issue #5396: [FLINK-5820] [state backends] Split shared/exclusive stat...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5396 Merged in 31e97e57ceeaf37264ab6db078552b73ee5121bf ---
[jira] [Resolved] (FLINK-8539) Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal of checkpoint storage locations
[ https://issues.apache.org/jira/browse/FLINK-8539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8539. - Resolution: Fixed Fixed as of d85a62db13a1d90159e7e8f924c8006febad552b - f9dd19b584fca5932594392b0afc9f1d0eec7f1a - f577d2603fb0432fa7a15d54ee25f481bde95d95 - d85a62db13a1d90159e7e8f924c8006febad552b > Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal > of checkpoint storage locations > > > Key: FLINK-8539 > URL: https://issues.apache.org/jira/browse/FLINK-8539 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > The storage location of completed checkpoints misses a proper representation. > Because of that, there is no place that can handle the deletion of a > checkpoint directory, or the dropping of a checkpoint specific table. > Current workaround for file systems is, for example, that every file disposal > checks if the parent directory is now empty, and deletes it if that is the > case. That is not only inefficient, but prohibitively expensive on some > systems, like Amazon S3. > Properly representing the storage location for completed checkpoints allows > us to add a disposal call for that location. > That {{CompletedCheckpointStorageLocation}} can also be used to capture > "external pointers", metadata, and even allow us to use custom serialization > and deserialization of the metadata in the future, making the abstraction > more extensible by allowing users to introduce new types of state handles. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8539) Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal of checkpoint storage locations
[ https://issues.apache.org/jira/browse/FLINK-8539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8539. --- > Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal > of checkpoint storage locations > > > Key: FLINK-8539 > URL: https://issues.apache.org/jira/browse/FLINK-8539 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > The storage location of completed checkpoints misses a proper representation. > Because of that, there is no place that can handle the deletion of a > checkpoint directory, or the dropping of a checkpoint specific table. > Current workaround for file systems is, for example, that every file disposal > checks if the parent directory is now empty, and deletes it if that is the > case. That is not only inefficient, but prohibitively expensive on some > systems, like Amazon S3. > Properly representing the storage location for completed checkpoints allows > us to add a disposal call for that location. > That {{CompletedCheckpointStorageLocation}} can also be used to capture > "external pointers", metadata, and even allow us to use custom serialization > and deserialization of the metadata in the future, making the abstraction > more extensible by allowing users to introduce new types of state handles. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8540) FileStateHandles must not attempt to delete their parent directory.
[ https://issues.apache.org/jira/browse/FLINK-8540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8540. --- > FileStateHandles must not attempt to delete their parent directory. > --- > > Key: FLINK-8540 > URL: https://issues.apache.org/jira/browse/FLINK-8540 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > Currently, every file disposal checks if the parent directory is now empty, > and deletes it if that is the case. That is not only inefficient, but > prohibitively expensive on some systems, like Amazon S3. > With the resolution of [FLINK-8539], this will no longer be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8540) FileStateHandles must not attempt to delete their parent directory.
[ https://issues.apache.org/jira/browse/FLINK-8540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8540. - Resolution: Fixed Fixed via 31e97e57ceeaf37264ab6db078552b73ee5121bf > FileStateHandles must not attempt to delete their parent directory. > --- > > Key: FLINK-8540 > URL: https://issues.apache.org/jira/browse/FLINK-8540 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > Currently, every file disposal checks if the parent directory is now empty, > and deletes it if that is the case. That is not only inefficient, but > prohibitively expensive on some systems, like Amazon S3. > With the resolution of [FLINK-8539], this will no longer be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348797#comment-16348797 ] ASF GitHub Bot commented on FLINK-8345: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165399750 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -586,7 +586,7 @@ private StreamGraph generateInternal(Listtransformatio transform.getOutputType(), transform.getName()); - if (transform.getStateKeySelector1() != null) { + if (transform.getStateKeySelector1() != null || transform.getStateKeySelector2() != null) { --- End diff -- Does this, in combination with the comment below on the `DataStreamTest` mean that you want to move the `DataStreamTest` test in a separate `ITCase`? > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165399750 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -586,7 +586,7 @@ private StreamGraph generateInternal(Listtransformatio transform.getOutputType(), transform.getName()); - if (transform.getStateKeySelector1() != null) { + if (transform.getStateKeySelector1() != null || transform.getStateKeySelector2() != null) { --- End diff -- Does this, in combination with the comment below on the `DataStreamTest` mean that you want to move the `DataStreamTest` test in a separate `ITCase`? ---
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348790#comment-16348790 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165397632 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -753,6 +763,182 @@ public void onTimer( assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } + @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Mapexpected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector ) value -> value); + + final DataStream srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(10L, expected)); + + output.addSink(new DiscardingSink<>()); + env.execute(); --- End diff -- So far, all tests in this are purely translation tests. I mentioned this in another comment, that it would be good to have an ITCase that actually verifies that using keyed state works and that the other features work as well in a complete program. Have a look at `SideOutputITCase`, for example. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348792#comment-16348792 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165395860 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.api.common.state.ReadWriteBroadcastState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}. + * + * @param The key type of the input keyed stream. + * @param The input type of the keyed (non-broadcast) side. + * @param The input type of the broadcast side. + * @param The output type of the operator. + */ +@Internal +public class CoBroadcastWithKeyedOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator , Triggerable { + + private static final long serialVersionUID = 5926499536290284870L; + + private final List broadcastStateDescriptors; + + private transient TimestampedCollector collector; + + private transient Map broadcastStates; + + private transient ReadWriteContextImpl rwContext; + + private transient ReadOnlyContextImpl rContext; + + private transient OnTimerContextImpl onTimerContext; + + public CoBroadcastWithKeyedOperator( + final KeyedBroadcastProcessFunction function, + final List broadcastStateDescriptors) { + super(function); + this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors); + } + + @Override + public void open() throws Exception { + super.open(); + + InternalTimerService internalTimerService = +
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348791#comment-16348791 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165394684 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.co; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.api.common.state.ReadWriteBroadcastState; +import org.apache.flink.util.OutputTag; + +/** + * The base class containing the functionality available to all broadcast process function. + * These include the {@link BroadcastProcessFunction} and the {@link KeyedBroadcastProcessFunction}. + */ +@PublicEvolving +public abstract class BaseBroadcastProcessFunction extends AbstractRichFunction { + + private static final long serialVersionUID = -131631008887478610L; + + /** +* The base context available to all methods in a broadcast process function. This +* include {@link BroadcastProcessFunction BroadcastProcessFunctions} and +* {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}. +*/ + abstract class Context { + + /** +* Timestamp of the element currently being processed or timestamp of a firing timer. +* +* This might be {@code null}, for example if the time characteristic of your program +* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. +*/ + public abstract Long timestamp(); + + /** +* Emits a record to the side output identified by the {@link OutputTag}. +* +* @param outputTag the {@code OutputTag} that identifies the side output to emit to. +* @param value The record to emit. +*/ + public abstract void output(OutputTag outputTag, X value); + + /** Returns the current processing time. */ + public abstract long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + public abstract long currentWatermark(); + } + + /** +* A base {@link Context context} available to the broadcasted stream side of +* a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream BroadcastConnectedStream}. +* +* Apart from the basic functionality of a {@link Context context}, +* this also allows to get and update the elements stored in the +* {@link ReadWriteBroadcastState broadcast state}. +* In other words, it gives read/write access to the broadcast state. +*/ + public abstract class ReadWriteContext extends Context { --- End diff -- I think this could be called `Context`, similar to my comment on `ReadWriteBroadcastState`. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348793#comment-16348793 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165398176 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -753,6 +763,182 @@ public void onTimer( assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } + @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Mapexpected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector ) value -> value); + + final DataStream srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(10L, expected)); + + output.addSink(new DiscardingSink<>()); + env.execute(); + } + + private abstract static class CustomWmEmitter implements AssignerWithPunctuatedWatermarks { + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp); + } + } + + private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction { + + private final Map expectedState; + + private final long timerTimestamp; + + static final MapStateDescriptor DESCRIPTOR = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + TestBroadcastProcessFunction( + final long timerTS, + final Map expectedBroadcastState + ) { + expectedState = expectedBroadcastState; + timerTimestamp = timerTS; + } + + @Override + public void processElement(Long value, KeyedReadOnlyContext ctx, Collector out) throws Exception { + ctx.timerService().registerEventTimeTimer(timerTimestamp); + } + + @Override + public void processBroadcastElement(String value, KeyedReadWriteContext ctx, Collector out) throws Exception { + long key = Long.parseLong(value.split(":")[1]); + ctx.getBroadcastState(DESCRIPTOR).put(key, value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + Map map = new HashMap<>(); + for (Map.Entry entry : ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) { + map.put(entry.getKey(), entry.getValue()); + }
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348789#comment-16348789 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165395176 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -586,7 +586,7 @@ private StreamGraph generateInternal(Listtransformatio transform.getOutputType(), transform.getName()); - if (transform.getStateKeySelector1() != null) { + if (transform.getStateKeySelector1() != null || transform.getStateKeySelector2() != null) { --- End diff -- I think we need some test that using keyed state actually works when only one of the inputs is keyed. I think we need an `ITCase` for that somewhere. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165398176 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -753,6 +763,182 @@ public void onTimer( assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } + @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Mapexpected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector ) value -> value); + + final DataStream srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(10L, expected)); + + output.addSink(new DiscardingSink<>()); + env.execute(); + } + + private abstract static class CustomWmEmitter implements AssignerWithPunctuatedWatermarks { + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp); + } + } + + private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction { + + private final Map expectedState; + + private final long timerTimestamp; + + static final MapStateDescriptor DESCRIPTOR = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + TestBroadcastProcessFunction( + final long timerTS, + final Map expectedBroadcastState + ) { + expectedState = expectedBroadcastState; + timerTimestamp = timerTS; + } + + @Override + public void processElement(Long value, KeyedReadOnlyContext ctx, Collector out) throws Exception { + ctx.timerService().registerEventTimeTimer(timerTimestamp); + } + + @Override + public void processBroadcastElement(String value, KeyedReadWriteContext ctx, Collector out) throws Exception { + long key = Long.parseLong(value.split(":")[1]); + ctx.getBroadcastState(DESCRIPTOR).put(key, value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + Map map = new HashMap<>(); + for (Map.Entry entry : ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) { + map.put(entry.getKey(), entry.getValue()); + } + Assert.assertEquals(expectedState, map); + } + } + + /** +* Tests that with a {@link KeyedStream} we have to provide a {@link KeyedBroadcastProcessFunction}. +*/ +
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165395176 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -586,7 +586,7 @@ private StreamGraph generateInternal(Listtransformatio transform.getOutputType(), transform.getName()); - if (transform.getStateKeySelector1() != null) { + if (transform.getStateKeySelector1() != null || transform.getStateKeySelector2() != null) { --- End diff -- I think we need some test that using keyed state actually works when only one of the inputs is keyed. I think we need an `ITCase` for that somewhere. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165397632 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -753,6 +763,182 @@ public void onTimer( assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } + @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Mapexpected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector ) value -> value); + + final DataStream srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(10L, expected)); + + output.addSink(new DiscardingSink<>()); + env.execute(); --- End diff -- So far, all tests in this are purely translation tests. I mentioned this in another comment, that it would be good to have an ITCase that actually verifies that using keyed state works and that the other features work as well in a complete program. Have a look at `SideOutputITCase`, for example. ð ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165394684 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.co; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.api.common.state.ReadWriteBroadcastState; +import org.apache.flink.util.OutputTag; + +/** + * The base class containing the functionality available to all broadcast process function. + * These include the {@link BroadcastProcessFunction} and the {@link KeyedBroadcastProcessFunction}. + */ +@PublicEvolving +public abstract class BaseBroadcastProcessFunction extends AbstractRichFunction { + + private static final long serialVersionUID = -131631008887478610L; + + /** +* The base context available to all methods in a broadcast process function. This +* include {@link BroadcastProcessFunction BroadcastProcessFunctions} and +* {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}. +*/ + abstract class Context { + + /** +* Timestamp of the element currently being processed or timestamp of a firing timer. +* +* This might be {@code null}, for example if the time characteristic of your program +* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. +*/ + public abstract Long timestamp(); + + /** +* Emits a record to the side output identified by the {@link OutputTag}. +* +* @param outputTag the {@code OutputTag} that identifies the side output to emit to. +* @param value The record to emit. +*/ + public abstract void output(OutputTag outputTag, X value); + + /** Returns the current processing time. */ + public abstract long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + public abstract long currentWatermark(); + } + + /** +* A base {@link Context context} available to the broadcasted stream side of +* a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream BroadcastConnectedStream}. +* +* Apart from the basic functionality of a {@link Context context}, +* this also allows to get and update the elements stored in the +* {@link ReadWriteBroadcastState broadcast state}. +* In other words, it gives read/write access to the broadcast state. +*/ + public abstract class ReadWriteContext extends Context { --- End diff -- I think this could be called `Context`, similar to my comment on `ReadWriteBroadcastState`. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165395860 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.api.common.state.ReadWriteBroadcastState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}. + * + * @param The key type of the input keyed stream. + * @param The input type of the keyed (non-broadcast) side. + * @param The input type of the broadcast side. + * @param The output type of the operator. + */ +@Internal +public class CoBroadcastWithKeyedOperator+ extends AbstractUdfStreamOperator > + implements TwoInputStreamOperator , Triggerable { + + private static final long serialVersionUID = 5926499536290284870L; + + private final List broadcastStateDescriptors; + + private transient TimestampedCollector collector; + + private transient Map broadcastStates; + + private transient ReadWriteContextImpl rwContext; + + private transient ReadOnlyContextImpl rContext; + + private transient OnTimerContextImpl onTimerContext; + + public CoBroadcastWithKeyedOperator( + final KeyedBroadcastProcessFunction function, + final List broadcastStateDescriptors) { + super(function); + this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors); + } + + @Override + public void open() throws Exception { + super.open(); + + InternalTimerService internalTimerService = + getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); + + TimerService timerService = new SimpleTimerService(internalTimerService); + + collector = new TimestampedCollector<>(output); +
[jira] [Closed] (FLINK-8531) Support separation of "Exclusive", "Shared" and "Task owned" state
[ https://issues.apache.org/jira/browse/FLINK-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8531. --- > Support separation of "Exclusive", "Shared" and "Task owned" state > -- > > Key: FLINK-8531 > URL: https://issues.apache.org/jira/browse/FLINK-8531 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > Currently, all state created at a certain checkpoint goes into the directory > {{chk-id}}. > With incremental checkpointing, some state is shared across checkpoint and is > referenced by newer checkpoints. That way, old {{chk-id}} directories stay > around, containing some shared chunks. That makes it both for users and > cleanup hooks hard to determine when a {{chk-x}} directory could be deleted. > The same holds for state that can only every be dropped by certain operators > on the TaskManager, never by the JobManager / CheckpointCoordinator. Examples > of that state are write ahead logs, which need to be retained until the move > to the target system is complete, which may in some cases be later then when > the checkpoint that created them is disposed. > I propose to introduce different scopes for tasks: > - **EXCLUSIVE** is for state that belongs to one checkpoint only > - **SHARED** is for state that is possibly part of multiple checkpoints > - **TASKOWNED** is for state that must never by dropped by the JobManager. > For file based checkpoint targets, I propose that we have the following > directory layout: > {code} > /user-defined-checkpoint-dir > | > + --shared/ > + --taskowned/ > + --chk-1/ > + --chk-2/ > + --chk-3/ > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8531) Support separation of "Exclusive", "Shared" and "Task owned" state
[ https://issues.apache.org/jira/browse/FLINK-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8531. - Resolution: Fixed Fixed as of 4e481a72c1ed3cc5f177b511e5a72cd8726cf976 Consists of steps - 99495c91ecce7141ae8b2fbc96492681a9d130bd - 35c7d93ee85aa8689e804b713affa65b46af1acc - 9903c8c42793b922549835217c586c5928999ea5 - 5cc50934bdcf80ae1fa69abe69e2f214852653f9 - bb19e7f5278d43cd4fd265e3d2afa2fcc793ccf5 - 1887187f6b5c210d2091c69ef14fa8b8a5cae82c - fc21423e1f8f1a1661badef20f9c6f368f6daf8b - e0b0f45bd9c8b06bd2cda56f6859d0d3944aa00e - 4e481a72c1ed3cc5f177b511e5a72cd8726cf976 > Support separation of "Exclusive", "Shared" and "Task owned" state > -- > > Key: FLINK-8531 > URL: https://issues.apache.org/jira/browse/FLINK-8531 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.5.0 > > > Currently, all state created at a certain checkpoint goes into the directory > {{chk-id}}. > With incremental checkpointing, some state is shared across checkpoint and is > referenced by newer checkpoints. That way, old {{chk-id}} directories stay > around, containing some shared chunks. That makes it both for users and > cleanup hooks hard to determine when a {{chk-x}} directory could be deleted. > The same holds for state that can only every be dropped by certain operators > on the TaskManager, never by the JobManager / CheckpointCoordinator. Examples > of that state are write ahead logs, which need to be retained until the move > to the target system is complete, which may in some cases be later then when > the checkpoint that created them is disposed. > I propose to introduce different scopes for tasks: > - **EXCLUSIVE** is for state that belongs to one checkpoint only > - **SHARED** is for state that is possibly part of multiple checkpoints > - **TASKOWNED** is for state that must never by dropped by the JobManager. > For file based checkpoint targets, I propose that we have the following > directory layout: > {code} > /user-defined-checkpoint-dir > | > + --shared/ > + --taskowned/ > + --chk-1/ > + --chk-2/ > + --chk-3/ > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348761#comment-16348761 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165378004 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/ReadWriteBroadcastState.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Iterator; +import java.util.Map; + +/** + * A type of state that can be created to store the state of a {@code BroadcastStream}. This state assumes that + * the same elements are sent to all instances of an operator. + * + * CAUTION: the user has to guarantee that all task instances store the same elements in this type of state. + * + * Each operator instance individually maintains and stores elements in the broadcast state. The fact that the + * incoming stream is a broadcast one guarantees that all instances see all the elements. Upon recovery + * or re-scaling, the same state is given to each of the instances. To avoid hotspots, each task reads its previous + * partition, and if there are more tasks (scale up), then the new instances read from the old instances in a round + * robin fashion. This is why each instance has to guarantee that it stores the same elements as the rest. If not, + * upon recovery or rescaling you may have unpredictable redistribution of the partitions, thus unpredictable results. + * + * @param The key type of the elements in the {@link ReadWriteBroadcastState}. + * @param The value type of the elements in the {@link ReadWriteBroadcastState}. + */ +@PublicEvolving +public interface ReadWriteBroadcastStateextends ReadOnlyBroadcastState { --- End diff -- I think we can call this just `BroadcastState`. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348762#comment-16348762 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165377821 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** + * A read-only type of state that gives read-only access to the state of a {@code BroadcastStream} --- End diff -- I think the `access to the state of a BroadCast` stream bit might be a bit confusing because it ties it too heavily to broadcast streams. We should also have a big warning here that you should not modify the result of `get()` and also that you should not modify the entries in the immutable iterator because this would lead to problems with the heap state backend. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348765#comment-16348765 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165385868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -148,21 +170,27 @@ public void close() throws IOException { @Override public void dispose() { IOUtils.closeQuietly(closeStreamOnCancelRegistry); - registeredStates.clear(); + registeredOperatorStates.clear(); + registeredBroadcastStates.clear(); } // --- // State access methods // --- + @Override + publicReadWriteBroadcastState getBroadcastState(MapStateDescriptor stateDescriptor) throws Exception { --- End diff -- I think we don't need this because the other `getBroadcastState()` is only ever called with `BROADCAST` mode. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348764#comment-16348764 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165388777 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,33 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + if (VERSION >= 3) { --- End diff -- Wouldn't this always depend on the version of the code and not the version of the snapshot? That is, if we restore from a `VERSION < 3` snapshot we should not go into this code path. I think here you can get that via `getReadVersion()`. > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348763#comment-16348763 ] ASF GitHub Bot commented on FLINK-8345: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165387915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -57,18 +63,25 @@ public int getVersion() { @Override public int[] getCompatibleVersions() { // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x) - return new int[] {VERSION, 1}; + return new int[] {VERSION, 2, 1}; --- End diff -- This should probably now say "we are compatible with versions 3, 2, and 1". > Iterate over keyed state on broadcast side of connect with broadcast. > - > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165387915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -57,18 +63,25 @@ public int getVersion() { @Override public int[] getCompatibleVersions() { // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x) - return new int[] {VERSION, 1}; + return new int[] {VERSION, 2, 1}; --- End diff -- This should probably now say "we are compatible with versions 3, 2, and 1". ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165385868 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -148,21 +170,27 @@ public void close() throws IOException { @Override public void dispose() { IOUtils.closeQuietly(closeStreamOnCancelRegistry); - registeredStates.clear(); + registeredOperatorStates.clear(); + registeredBroadcastStates.clear(); } // --- // State access methods // --- + @Override + publicReadWriteBroadcastState getBroadcastState(MapStateDescriptor stateDescriptor) throws Exception { --- End diff -- I think we don't need this because the other `getBroadcastState()` is only ever called with `BROADCAST` mode. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165388777 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,33 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + if (VERSION >= 3) { --- End diff -- Wouldn't this always depend on the version of the code and not the version of the snapshot? That is, if we restore from a `VERSION < 3` snapshot we should not go into this code path. I think here you can get that via `getReadVersion()`. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165378004 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/ReadWriteBroadcastState.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Iterator; +import java.util.Map; + +/** + * A type of state that can be created to store the state of a {@code BroadcastStream}. This state assumes that + * the same elements are sent to all instances of an operator. + * + * CAUTION: the user has to guarantee that all task instances store the same elements in this type of state. + * + * Each operator instance individually maintains and stores elements in the broadcast state. The fact that the + * incoming stream is a broadcast one guarantees that all instances see all the elements. Upon recovery + * or re-scaling, the same state is given to each of the instances. To avoid hotspots, each task reads its previous + * partition, and if there are more tasks (scale up), then the new instances read from the old instances in a round + * robin fashion. This is why each instance has to guarantee that it stores the same elements as the rest. If not, + * upon recovery or rescaling you may have unpredictable redistribution of the partitions, thus unpredictable results. + * + * @param The key type of the elements in the {@link ReadWriteBroadcastState}. + * @param The value type of the elements in the {@link ReadWriteBroadcastState}. + */ +@PublicEvolving +public interface ReadWriteBroadcastStateextends ReadOnlyBroadcastState { --- End diff -- I think we can call this just `BroadcastState`. ---
[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165377821 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** + * A read-only type of state that gives read-only access to the state of a {@code BroadcastStream} --- End diff -- I think the `access to the state of a BroadCast` stream bit might be a bit confusing because it ties it too heavily to broadcast streams. We should also have a big warning here that you should not modify the result of `get()` and also that you should not modify the entries in the immutable iterator because this would lead to problems with the heap state backend. ---
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348755#comment-16348755 ] ASF GitHub Bot commented on FLINK-8308: --- Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 Sure. > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 Sure. ---
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348744#comment-16348744 ] ASF GitHub Bot commented on FLINK-8308: --- Github user StevenLangbroek commented on the issue: https://github.com/apache/flink/pull/5395 @alpinegizmo I tested locally with Ruby 2.3 (default on High Sierra). I'm having some trouble with `rvm`, could you help testing with different versions? > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user StevenLangbroek commented on the issue: https://github.com/apache/flink/pull/5395 @alpinegizmo I tested locally with Ruby 2.3 (default on High Sierra). I'm having some trouble with `rvm`, could you help testing with different versions? ---
[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher
[ https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348739#comment-16348739 ] ASF GitHub Bot commented on FLINK-8308: --- Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 It looks good, but I haven't tested it. I'm wondering what versions of Ruby this has been tested with. At a minimum it needs to work with whatever version we can get on the production build infrastructure, as well as 2.3 and 2.4, since most developers will have one of those versions. And maybe 2.5, since that's out now. > Update yajl-ruby dependency to 1.3.1 or higher > -- > > Key: FLINK-8308 > URL: https://issues.apache.org/jira/browse/FLINK-8308 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Fabian Hueske >Assignee: Steven Langbroek >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > We got notified that yajl-ruby < 1.3.1, a dependency which is used to build > the Flink website, has a security vulnerability of high severity. > We should update yajl-ruby to 1.3.1 or higher. > Since the website is built offline and served as static HTML, I don't think > this is a super critical issue (please correct me if I'm wrong), but we > should resolve this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...
Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/5395 It looks good, but I haven't tested it. I'm wondering what versions of Ruby this has been tested with. At a minimum it needs to work with whatever version we can get on the production build infrastructure, as well as 2.3 and 2.4, since most developers will have one of those versions. And maybe 2.5, since that's out now. ---
[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4
[ https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348713#comment-16348713 ] ASF GitHub Bot commented on FLINK-8472: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5364#discussion_r165385564 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java --- @@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { --- End diff -- @zentol can you confirm whether these changes make sense? From the discussions I see in https://github.com/apache/flink/pull/3844, I assume this was a leftover to write a 1.2 savepoint (when uids couldn't be added for each chained operator separately). So it should be ok to remove this? > Extend migration tests for Flink 1.4 > > > Key: FLINK-8472 > URL: https://issues.apache.org/jira/browse/FLINK-8472 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > The following migration tests should be extended to cover migrating Flink > jobs with a 1.4 savepoint. > * {{WindowOperatorMigrationTest}} > * {{CEPMigrationTest}} > * {{StatefulJobSavepointMigrationTestITCase}} > * {{FlinkKinesisConsumerMigrationTest}} > * {{FlinkKafkaConsumerBaseMigrationTest}} > * {{ContinuousFileProcessingMigrationTest}} > * {{BucketingSinkMigrationTest}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8546) Respect savepoint settings and recover from latest checkpoint in Flip-6
Till Rohrmann created FLINK-8546: Summary: Respect savepoint settings and recover from latest checkpoint in Flip-6 Key: FLINK-8546 URL: https://issues.apache.org/jira/browse/FLINK-8546 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 The {{JobMaster}} should respect savepoints and recover from the latest checkpoint if possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5364: [FLINK-8472] [test] Extend all migration tests for...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5364#discussion_r165385564 --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java --- @@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception { .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { --- End diff -- @zentol can you confirm whether these changes make sense? From the discussions I see in https://github.com/apache/flink/pull/3844, I assume this was a leftover to write a 1.2 savepoint (when uids couldn't be added for each chained operator separately). So it should be ok to remove this? ---
[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4
[ https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348707#comment-16348707 ] ASF GitHub Bot commented on FLINK-8472: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5364 @aljoscha @zentol could you have another quick look at commit 8882fb7? That commit extends `KeyedComplexChainTest`, `ChainBreakTest`, `ChainLengthIncreaseTest`, ... etc. also for 1.4. Once that is good, I'll merge this to `master` and `release-1.4`. > Extend migration tests for Flink 1.4 > > > Key: FLINK-8472 > URL: https://issues.apache.org/jira/browse/FLINK-8472 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > The following migration tests should be extended to cover migrating Flink > jobs with a 1.4 savepoint. > * {{WindowOperatorMigrationTest}} > * {{CEPMigrationTest}} > * {{StatefulJobSavepointMigrationTestITCase}} > * {{FlinkKinesisConsumerMigrationTest}} > * {{FlinkKafkaConsumerBaseMigrationTest}} > * {{ContinuousFileProcessingMigrationTest}} > * {{BucketingSinkMigrationTest}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5364: [FLINK-8472] [test] Extend all migration tests for Flink ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5364 @aljoscha @zentol could you have another quick look at commit 8882fb7? That commit extends `KeyedComplexChainTest`, `ChainBreakTest`, `ChainLengthIncreaseTest`, ... etc. also for 1.4. Once that is good, I'll merge this to `master` and `release-1.4`. ---
[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
[ https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348691#comment-16348691 ] Dyana Rose commented on FLINK-8073: --- Another instance: [https://travis-ci.org/apache/flink/jobs/336094494,] https://api.travis-ci.org/v3/job/336094494/log.txt > Test instability > FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint() > - > > Key: FLINK-8073 > URL: https://issues.apache.org/jira/browse/FLINK-8073 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348692#comment-16348692 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 the ci fail looks to be a known flaky test: FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...
Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 the ci fail looks to be a known flaky test: FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint ---
[jira] [Commented] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348675#comment-16348675 ] Fabian Hueske commented on FLINK-8545: -- Yes, many user have been asking for this feature recently. Great that you're taking the initiative to work on this! I'd propose to first start with the {{DataStream}} to {{Table}} upsert conversion and work on the {{UpsertTableSource}} interface later because these will need careful API design. The keys could be defined similar to the time indicators, i.e., {code:java} DataStream[(String, Long, Int)] input = ??? // upsert with key Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) // upsert without key -> single row table Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} I think it would be good to be able to declare a time attribute that decides whether an upsert is performed or not. For example this could look like this: {code:java} DataStream[(String, Long, Int)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key){code} > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-8545: --- Description: As more and more users are eager for ingesting data with upsert mode in flink sql/table-api, it is valuable to enable table source with upsert mode. I will provide a design doc later and we can have more discussions. Any suggestions are warmly welcomed ! was: As more and more users are eager for ingesting data with upsert mode, it is valuable to enable table source with upsert mode. I will provide a design doc later and we can have more discussions. Any suggestions are warmly welcomed ! > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8545) Implement upsert stream table source
Hequn Cheng created FLINK-8545: -- Summary: Implement upsert stream table source Key: FLINK-8545 URL: https://issues.apache.org/jira/browse/FLINK-8545 Project: Flink Issue Type: New Feature Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng As more and more users are eager for ingesting data with upsert mode, it is valuable to enable table source with upsert mode. I will provide a design doc later and we can have more discussions. Any suggestions are warmly welcomed ! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5398: [hotfix][cep] Remove migration from 1.5 test
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5398 Thanks, LGTM. We can add it back once `release-1.5` is cut. It might not really make any difference in the end, but we never know :) ---
[GitHub] flink issue #5398: [hotfix][cep] Remove migration from 1.5 test
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5398 @tzulitai I removed the migration from 1.5 ---
[GitHub] flink pull request #5398: [hotfix][cep] Remove migration from 1.5 test
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/5398 [hotfix][cep] Remove migration from 1.5 test *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink cep-remove-1-5-test Alternatively you can review and apply these changes as the patch at:
[GitHub] flink pull request #:
Github user dawidwys commented on the pull request: https://github.com/apache/flink/commit/a2533f406d46b1c5acb5f70c263f9afad839dffe#commitcomment-27262545 In flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java: In flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java on line 74: You're right. Will create a hotfix PR, that removes it. ---
[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5372: - Affects Version/s: 1.5.0 > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5372: - Fix Version/s: 1.5.0 > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348593#comment-16348593 ] Till Rohrmann commented on FLINK-5372: -- Another instance: https://api.travis-ci.org/v3/job/335627318/log.txt > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)