[jira] [Commented] (FLINK-4939) GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint
[ https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636586#comment-15636586 ] ASF GitHub Bot commented on FLINK-4939: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2707#discussion_r86563313 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java --- @@ -77,111 +84,144 @@ public GenericWriteAheadSink(CheckpointCommitter committer, TypeSerializer s public void open() throws Exception { super.open(); committer.setOperatorId(id); - committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); committer.open(); - cleanState(); - checkpointStreamFactory = - getContainingTask().createCheckpointStreamFactory(this); + + checkpointStreamFactory = getContainingTask() + .createCheckpointStreamFactory(this); + + cleanRestoredHandles(); } public void close() throws Exception { committer.close(); } /** -* Saves a handle in the state. +* Called when a checkpoint barrier arrives. It closes any open streams to the backend +* and marks them as pending for committing to the external, third-party storage system. * -* @param checkpointId -* @throws IOException +* @param checkpointId the id of the latest received checkpoint. +* @throws IOException in case something went wrong when handling the stream to the backend. */ private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception { + Preconditions.checkNotNull(this.pendingCheckpoints, "The operator has not been properly initialized."); + //only add handle if a new OperatorState was created since the last snapshot if (out != null) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); StreamStateHandle handle = out.closeAndGetHandle(); - if (state.pendingHandles.containsKey(checkpointId)) { + + PendingCheckpoint pendingCheckpoint = new PendingCheckpoint( + checkpointId, subtaskIdx, timestamp, handle); + + if (pendingCheckpoints.contains(pendingCheckpoint)) { //we already have a checkpoint stored for that ID that may have been partially written, //so we discard this "alternate version" and use the stored checkpoint handle.discardState(); } else { - state.pendingHandles.put(checkpointId, new Tuple2<>(timestamp, handle)); + pendingCheckpoints.add(pendingCheckpoint); } out = null; } } @Override - public void snapshotState(FSDataOutputStream out, - long checkpointId, - long timestamp) throws Exception { + public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { saveHandleInState(checkpointId, timestamp); - InstantiationUtil.serializeObject(out, state); + DataOutputViewStreamWrapper outStream = new DataOutputViewStreamWrapper(out); + outStream.writeInt(pendingCheckpoints.size()); + for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) { + pendingCheckpoint.serialize(outStream); + } } @Override public void restoreState(FSDataInputStream in) throws Exception { - this.state = InstantiationUtil.deserializeObject(in, getUserCodeClassloader()); + final DataInputViewStreamWrapper inStream = new DataInputViewStreamWrapper(in); + int numPendingHandles = inStream.readInt(); + for (int i = 0; i < numPendingHandles; i++) { + pendingCheckpoints.add(PendingCheckpoint.restore(inStream, getUserCodeClassloader())); + } } - private void cleanState() throws Exception { - synchronized (this.state.pendingHandles) { //remove all handles that were already committed - Set pastCheckpointIds = this.state.pendingHandles.keySet(); - Set checkpointsToRemove = new HashSet<>(); - for (Long pastCheckpointId : pastCheckpointIds) { - if
[jira] [Commented] (FLINK-4174) Enhance Window Evictor
[ https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636619#comment-15636619 ] ASF GitHub Bot commented on FLINK-4174: --- Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2736 @aljoscha I have made the changes, could you please review it. > Enhance Window Evictor > -- > > Key: FLINK-4174 > URL: https://issues.apache.org/jira/browse/FLINK-4174 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: vishnu viswanath >Assignee: vishnu viswanath > > Enhance the current functionality of Evictor as per this [design > document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit]. > This includes: > - Allow eviction of elements from the window in any order (not only from the > beginning). To do this Evictor must go through the list of elements and > remove the elements that have to be evicted instead of the current approach > of : returning the count of elements to be removed from beginning. > - Allow eviction to be done before/after applying the window function. > FLIP page for this enhancement : > [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2736 @aljoscha I have made the changes, could you please review it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636613#comment-15636613 ] ASF GitHub Bot commented on FLINK-3930: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86564819 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -57,24 +61,37 @@ // constructor in order to work with the generic deserializer. // - static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic number (4), msg ID (1) + static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), magic number (4), Cookie (4), msg ID (1) --- End diff -- Looks like this is the cookie length > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r86564819 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -57,24 +61,37 @@ // constructor in order to work with the generic deserializer. // - static final int HEADER_LENGTH = 4 + 4 + 1; // frame length (4), magic number (4), msg ID (1) + static final int HEADER_LENGTH = 4 + 4 + 4 + 1; // frame length (4), magic number (4), Cookie (4), msg ID (1) --- End diff -- Looks like this is the cookie length --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4997) Extending Window Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636673#comment-15636673 ] ASF GitHub Bot commented on FLINK-4997: --- GitHub user VenturaDelMonte opened a pull request: https://github.com/apache/flink/pull/2756 [FLINK-4997] Extending Window Function Metadata This PR aims to introduce what discussed in [FLIP-2](https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata). WindowedStream apply methods have been overloaded in order to support ProcessWindowFunction (and its rich counterpart). Streaming runtime internals have been modified in order to support the new function, however fully backward compatibility to WindowFunction (and its rich counterpart) is guaranteed by silently wrapping it with a ProcessWindowFunction. This PR implementation strictly follow what decided in the FLIP, nothing has been changed for AllWindowedStream. Windows documentation has been overhauled in order to illustrate ProcessWindowFunction API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VenturaDelMonte/flink flip-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2756.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2756 commit 5a33fe2b9e5c23e8529964a489465f51410432df Author: Ventura Del MonteDate: 2016-09-21T14:04:00Z Merge remote-tracking branch 'apache/master' commit 39a8bb9433edf5bb9adb850c667b71ef8d25c6b6 Author: Ventura Del Monte Date: 2016-09-22T08:31:58Z Merge remote-tracking branch 'upstream/master' commit 2507a71e2a40b05b3e5c7507a1c32d6678e07810 Author: Ventura Del Monte Date: 2016-09-22T09:40:40Z Merge remote-tracking branch 'upstream/master' commit 57d0bca5681e5ea0ba63f3b95fe4f949af3734de Author: Ventura Del Monte Date: 2016-10-04T15:03:55Z Merge remote-tracking branch 'upstream/master' commit 9f55a1e3e56b48d9dc5a4d1b3109b41e1c89ce5d Author: Ventura Del Monte Date: 2016-11-02T09:14:52Z Merge remote-tracking branch 'upstream/master' commit 9a71d092ad06c3592355ad11dfb7bd4b982ded9f Author: Ventura Del Monte Date: 2016-11-03T20:44:56Z [FLINK-4997] [streaming] Extending window function metadata introducing ProcessWindowFunction commit 878983074d364ea0d340bd3497343f286d28e3db Author: Ventura Del Monte Date: 2016-11-04T14:29:52Z [FLINK-4997] [docs] improved windows documentation explaining the new ProcessWindowFunction API > Extending Window Function Metadata > -- > > Key: FLINK-4997 > URL: https://issues.apache.org/jira/browse/FLINK-4997 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming, Windowing Operators >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > Fix For: 1.2.0 > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2740: [FLINK-4964] [ml]
Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 I've changed my code so that I have now mapping:DataSet[(String,Long)] val mapping = input .mapWith( s => (s, 1) ) .groupBy( 0 ) .reduce( (a, b) => (a._1, a._2 + b._2) ) .partitionByRange( 1 ) .zipWithIndex .mapWith { case (id, (label, count)) => (label, id) } Parsing a new DataSet[String] called rawInput, I'd like to use this mapping and associate each "label" of rawInput an ID (which is the Long value of mapping). Is it possible with a streaming approach (need a join for example) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636993#comment-15636993 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 I've changed my code so that I have now mapping:DataSet[(String,Long)] val mapping = input .mapWith( s => (s, 1) ) .groupBy( 0 ) .reduce( (a, b) => (a._1, a._2 + b._2) ) .partitionByRange( 1 ) .zipWithIndex .mapWith { case (id, (label, count)) => (label, id) } Parsing a new DataSet[String] called rawInput, I'd like to use this mapping and associate each "label" of rawInput an ID (which is the Long value of mapping). Is it possible with a streaming approach (need a join for example) ? > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2758: support specifying an ESCAPE character in LIKE and SIMILA...
Github user miaoever commented on the issue: https://github.com/apache/flink/pull/2758 @twalthr @wuchong Would you please review my pr to flink, thx :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3395) Polishing the web UI
[ https://issues.apache.org/jira/browse/FLINK-3395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-3395. - Resolution: Later > Polishing the web UI > > > Key: FLINK-3395 > URL: https://issues.apache.org/jira/browse/FLINK-3395 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Greg Hogan >Assignee: Greg Hogan > > On the job properties page one must select an operator from the plan. > Elsewhere in the UI a list of operators is displayed and clicking the table > or the plan will reveal the requested information. > A list of operators could likewise be added to the timeline page. > The job exceptions page should display a "No exceptions" notification as done > elsewhere for when there is nothing to display. > The job plan is not redrawn when the browser window is resized. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3767) Show timeline also for running tasks, not only for finished ones
[ https://issues.apache.org/jira/browse/FLINK-3767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636907#comment-15636907 ] Greg Hogan commented on FLINK-3767: --- I'm running 1.2-SNAPSHOT and this looks to be implemented. [~rmetzger] can you verify? > Show timeline also for running tasks, not only for finished ones > > > Key: FLINK-3767 > URL: https://issues.apache.org/jira/browse/FLINK-3767 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: Robert Metzger > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5020) Make the GenericWriteAheadSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637222#comment-15637222 ] ASF GitHub Bot commented on FLINK-5020: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2759 [FLINK-5020] Make the GenericWriteAheadSink rescalable. As the issue suggests, this is the final commit to make the GenericWriteAheadSink re-scalable. In essence, what it does is that it replaces the old snapshot()/restore() cycle with the new snapshotState()/initializeState() and adds tests to show that it works as expected. R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink rescaling_wr_ahead Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2759.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2759 commit 1cdfff0f56a77616f36addfa1cb48b2efe2c9626 Author: kl0uDate: 2016-10-26T15:19:12Z [FLINK-4939] GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint So far the GenericWriteAheadSink expected that the subtask that wrote a temporary buffer to the state backend, will be also the one to commit it to the third-party storage system. This commit removes this assumption. To do this it changes the CheckpointCommitter to dynamically take the subtaskIdx as a parameter when asking if a checkpoint was committed and also changes the state kept by the GenericWriteAheadSink to also include that subtask index of the subtask that wrote the pending buffer. commit 77c3892687f78780c19710a95f8830907fe67c86 Author: kl0u Date: 2016-11-03T11:28:37Z Integrated PR comments commit c8627568c375cdd37ac0a8314fc5bf56077a578e Author: kl0u Date: 2016-11-04T15:05:19Z Removes redundant safety check. commit 2b9c28059e84ba09068ed0350680730f77119253 Author: kl0u Date: 2016-11-03T20:46:58Z [FLINK-5020] Make the GenericWriteAheadSink rescalable. > Make the GenericWriteAheadSink rescalable. > -- > > Key: FLINK-5020 > URL: https://issues.apache.org/jira/browse/FLINK-5020 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > This targets integrating the GenericWriteAheadSink with the new rescalable > state abstractions so that the parallelism of the operator can change > arbitrarily without jeopardizing the guarantees offered by it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2759: [FLINK-5020] Make the GenericWriteAheadSink rescal...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2759 [FLINK-5020] Make the GenericWriteAheadSink rescalable. As the issue suggests, this is the final commit to make the GenericWriteAheadSink re-scalable. In essence, what it does is that it replaces the old snapshot()/restore() cycle with the new snapshotState()/initializeState() and adds tests to show that it works as expected. R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink rescaling_wr_ahead Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2759.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2759 commit 1cdfff0f56a77616f36addfa1cb48b2efe2c9626 Author: kl0uDate: 2016-10-26T15:19:12Z [FLINK-4939] GenericWriteAheadSink: Decouple the creating from the committing subtask for a pending checkpoint So far the GenericWriteAheadSink expected that the subtask that wrote a temporary buffer to the state backend, will be also the one to commit it to the third-party storage system. This commit removes this assumption. To do this it changes the CheckpointCommitter to dynamically take the subtaskIdx as a parameter when asking if a checkpoint was committed and also changes the state kept by the GenericWriteAheadSink to also include that subtask index of the subtask that wrote the pending buffer. commit 77c3892687f78780c19710a95f8830907fe67c86 Author: kl0u Date: 2016-11-03T11:28:37Z Integrated PR comments commit c8627568c375cdd37ac0a8314fc5bf56077a578e Author: kl0u Date: 2016-11-04T15:05:19Z Removes redundant safety check. commit 2b9c28059e84ba09068ed0350680730f77119253 Author: kl0u Date: 2016-11-03T20:46:58Z [FLINK-5020] Make the GenericWriteAheadSink rescalable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15637246#comment-15637246 ] Stephan Ewen commented on FLINK-5012: - I just wanted the simple way of outputting an element to be there. If the {{Collector}} would be an {{OutputCollector}} (or so) with two methods: {{collect()}} and {{collectWithTimestamp()}}, would that work? > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)