[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user uce closed the pull request at: https://github.com/apache/flink/pull/471 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-83049370 Thanks for the comments. I'll address the generateSequence comment and open an issue for the optimizer deadlock detection and pipeline breaking stuff after merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-82555635 Once this is in, we can start removing the deadlock detection in the optimizer and the pipeline breaking caches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-82554365 Look good, nice test coverage and fits very well with the recent execution mode changes. Only downside: This pull request does contains many cases where only whitespaces where changed, or the order of imports. This is not really helping the reviews and clutters the change history of files and hinders git blame. I know that some IDEs to that automatically, but it would be nice to deactivate that in the future. Why the hell some IDEs think that adding 100 changed lines to a file (where only one typo was fixed) would help is beyond me... Minor comment: The JoinDeadlock test creates a dedicated input format that does not do anything different then `ExecutionEnvironment#generateSequence` would do, if the source parallelism was set to one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-82479502 I've rebased this on the current master including the `ExecutionMode` changes introduced by Stephan. This allows you to set the type of produced runtime result partitions as follows: ```java final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setExecutionMode(ExecutionMode.BATCH); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-79017193 I've rebased this on the latest master and set the default I/O mode to synchronous, i.e. we currently use the simpler synchronous spilled subpartition view when consuming intermediate results. As Stephan pointed out, it makes sense to have the simple version in place as long as it is not clear what the benefits of the tricky asynchronous version is. The memory configuration has not been changed in this PR, because I don't think that it makes too much sense to give more memory to the network stack to (maybe) keep blocking results in-memory as long as we don't have any functionality in place to leverage these cached partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-78481915 PS: where we wouldn't need the asynchronous implementations are for local reads. There it should be perfectly fine to just synchronously read the spilled partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-78479167 The root "cause" of all asynchronous operations is that TCP connections are shared among multiple logical channels, which are handled by a fixed number of network I/O threads. In case of synchronous I/O operations, we would essentially block progress on all channels sharing that connection/thread. > When do you issue the read requests to the reader (from disk)? Is that dependent on when the TCP channel is writable? Yes, the network I/O thread has subpartitions queued for transfer and only queries them for data when the TCP channel is writable. > When the read request is issued, before the response comes, if the subpartition de-registered from netty and the re-registered one a buffer has returned from disk? Exactly. If there is no buffer available, the read request is issued and the next available subpartition is tried. If none of the subpartitions has data available, the network I/O thread works on another TCP channel (this is done by Netty, which multiplexes all TCP channels over a fixed amount of network I/O threads). > Given many spilled partitions, which one is read from next? How is the buffer assignment realized? There is a lot of trickyness in there, because disk I/O performs well with longer sequential reads, but that may occupy many buffers that are missing for other reads into writable TCP channels. Initially this depends on the order of partition requests. After that on the order of data availability. Regarding the buffers: trickyness, indeed. The current state with the buffers is kind of an intermediate solution as we will issue zero-transfer reads in the future (requires minimal changes), where we essentially only trigger reads to gather offsets. The reads are then only affected by TCP channel writability. Currently, the reads are batched in sizes of two buffers (64k). Regarding @tillrohrmann's changes: what was this exactly? Then I can verify that the changes are not undone. In general (minus the question regarding Till's changes) I think this PR is good to merge. The tests are stable and passing. There will be definitely a need to do refactorings and performance evaluations, but I think that is to be expected with such a big change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/471#issuecomment-78474538 Concerning the questions: 1. I think deploying after all blocking producers are finished is what we should go for as a start. It is also what people would expect from a blocking model. 2. This is a fair initial restriction. Let's relax that later, I can see benefits in that when dealing with tasks that cannot be deployed due to a a lack of resources. A few questions: The pull request generifies the IOManager and uses asynchronous disk I/O for the intermediate result spilling. Are there any experience points that this helps performance in the case here? I am curious, because the async I/O in the hash join / sorters was tricky enough. The interaction between asynchronous disk I/O and asynchronous network I/O must be very tricky. I think there should be a good reason to do this, otherwise we simply introduce error prone code for a completely unknown benefit. The asynchronous writing seems straightforward. For the reading / sending part: - When do you issue the read requests to the reader (from disk)? Is that dependent on when the TCP channel is writable? - When the read request is issued, before the response comes, if the subpartition de-registered from netty and the re-registered one a buffer has returned from disk? - Given many spilled partitions, which one is read from next? How is the buffer assignment realized? There is a lot of trickyness in there, because disk I/O performs well with longer sequential reads, but that may occupy many buffers that are missing for other reads into writable TCP channels. Can you elaborate on the mechanism behind this? I expect this to have quite an impact on the reliability of the mechanism and the performance. *IMPORTANT*: There has been a fix by @tillrohrmann to the Asynchronous Channel Readers / Writers a few weeks back . Are we sure that this is not undone by the changes here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/471 [FLINK-1350] [runtime] Add blocking result partition variant - **Renames** runtime intermediate result classes (after an offline discussion with @StephanEwen I realized that calling subpartitions *queue* was rather confusing): a) Removes "Intermediate" prefix b) Queue => Subpartition c) Iterator => View Documentation is coming up soon in FLINK-1373. - **[FLINK-1350](https://issues.apache.org/jira/browse/FLINK-1350)**: Adds a *spillable result subpartition variant* for **BLOCKING results**, which writes data to memory first and starts to spill (asynchronously) if not enough memory is available to produce the result in-memory only. Receiving tasks of BLOCKING results are only deployed after *all* partitions have been fully produced. PIPELINED and BLOCKING results can not be mixed. @StephanEwen, what is your opinion on these two points? We could also start deploying receivers of blocking results as soon as a single partition is finished and then send the update RPC calls later. The current solution is simpler and results in less RPC calls (single per parallel producer). - **[FLINK-1359](https://issues.apache.org/jira/browse/FLINK-1359)**: Adds **simple state tracking** to result partitions with notifications after partitions/subpartitions have been consumed. In normal operation, each partition has to be consumed at least once before it can be released. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/incubator-flink blocking_results-flink-1350 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/471.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #471 commit cbe71d72378dc3a8546dd0322f8712e7352b7a08 Author: Ufuk Celebi Date: 2015-01-06T16:11:08Z [FLINK-1350] [runtime] Add blocking result partition variant - Renames runtime intermediate result classes: a) Removes "Intermediate" prefix b) Queue => Subpartition c) Iterator => View - [FLINK-1350] Adds a spillable result subpartition variant for BLOCKING results, which writes data to memory first and starts to spill (asynchronously) if not enough memory is available to produce the result in-memory only. Receiving tasks of BLOCKING results are only deployed after *all* partitions have been fully produced. PIPELINED and BLOCKING results can not be mixed. - [FLINK-1359] Adds simple state tracking to result partitions with notifications after partitions/subpartitions have been consumed. Each partition has to be consumed at least once before it can be released. Currently there is no notion of historic intermediate results, i.e. results are released as soon as they are consumed. commit 878cf9cecf41877eb82aa3a34f3266c2d4760bcd Author: Ufuk Celebi Date: 2015-03-09T14:29:06Z [FLINK-1350] [runtime] Send final task state update only after unregistering task commit 315d96ed93a2b52ab9f0ac6bdf6727f497488478 Author: Ufuk Celebi Date: 2015-03-09T15:17:25Z [FLINK-1350] [runtime] Deploy receivers of blocking results only after all producers finished commit 66f7c4c0286e6b380107d5b7a22568ffbb3a8011 Author: Ufuk Celebi Date: 2015-03-09T15:47:17Z [runtime] Update Netty version to 4.0.26.Final commit fccfeca84f9bc990f11139cce3cd381d3f04dbd4 Author: Ufuk Celebi Date: 2015-03-10T09:50:46Z [tests] Increase number of network buffers in ProcessFailureBatchRecoveryITCase This is necessary, because the network thread has an extra buffer pool to read data into from spilled partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---