[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-18 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/471#issuecomment-83049370
  
Thanks for the comments. I'll address the generateSequence comment and open 
an issue for the optimizer deadlock detection and pipeline breaking stuff after 
merging this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-18 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/471


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/471#issuecomment-82554365
  
Look good, nice test coverage and fits very well with the recent execution 
mode changes.

Only downside: This pull request does contains many cases where only 
whitespaces where changed, or the order of imports. This is not really helping 
the reviews and clutters the change history of files and hinders git blame. I 
know that some IDEs to that automatically, but it would be nice to deactivate 
that in the future. Why the hell some IDEs think that adding 100 changed lines 
to a file (where only one typo was fixed) would help is beyond me...

Minor comment: The JoinDeadlock test creates a dedicated input format that 
does not do anything different then `ExecutionEnvironment#generateSequence` 
would do, if the source parallelism was set to one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/471#issuecomment-82555635
  
Once this is in, we can start removing the deadlock detection in the 
optimizer and the pipeline breaking caches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-17 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/471#issuecomment-82479502
  
I've rebased this on the current master including the `ExecutionMode` 
changes introduced by Stephan.

This allows you to set the type of produced runtime result partitions as 
follows:

```java
final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setExecutionMode(ExecutionMode.BATCH);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-13 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/471#issuecomment-79017193
  
I've rebased this on the latest master and set the default I/O mode to 
synchronous, i.e. we currently use the simpler synchronous spilled subpartition 
view when consuming intermediate results.

As Stephan pointed out, it makes sense to have the simple version in place 
as long as it is not clear what the benefits of the tricky asynchronous version 
is.

The memory configuration has not been changed in this PR, because I don't 
think that it makes too much sense to give more memory to the network stack to 
(maybe) keep blocking results in-memory as long as we don't have any 
functionality in place to leverage these cached partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-12 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/471#issuecomment-78481915
  
PS: where we wouldn't need the asynchronous implementations are for local 
reads. There it should be perfectly fine to just synchronously read the spilled 
partitions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-12 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/471#issuecomment-78479167
  
The root cause of all asynchronous operations is that TCP connections are 
shared among multiple logical channels, which are handled by a fixed number of 
network I/O threads. In case of synchronous I/O operations, we would 
essentially block progress on all channels sharing that connection/thread.

 When do you issue the read requests to the reader (from disk)? Is that 
dependent on when the TCP channel is writable?

Yes, the network I/O thread has subpartitions queued for transfer and only 
queries them for data when the TCP channel is writable.

 When the read request is issued, before the response comes, if the 
subpartition de-registered from netty and the re-registered one a buffer has 
returned from disk?

Exactly. If there is no buffer available, the read request is issued and 
the next available subpartition is tried. If none of the subpartitions has data 
available, the network I/O thread works on another TCP channel (this is done by 
Netty, which multiplexes all TCP channels over a fixed amount of network I/O 
threads).

 Given many spilled partitions, which one is read from next? How is the 
buffer assignment realized? There is a lot of trickyness in there, because disk 
I/O performs well with longer sequential reads, but that may occupy many 
buffers that are missing for other reads into writable TCP channels.

Initially this depends on the order of partition requests. After that on 
the order of data availability. 
Regarding the buffers: trickyness, indeed. The current state with the 
buffers is kind of an intermediate solution as we will issue zero-transfer 
reads in the future (requires minimal changes), where we essentially only 
trigger reads to gather offsets. The reads are then only affected by TCP 
channel writability. Currently, the reads are batched in sizes of two buffers 
(64k).



Regarding @tillrohrmann's changes: what was this exactly? Then I can verify 
that the changes are not undone.

In general (minus the question regarding Till's changes) I think this PR is 
good to merge. The tests are stable and passing. There will be definitely a 
need to do refactorings and performance evaluations, but I think that is to be 
expected with such a big change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1350] [runtime] Add blocking result par...

2015-03-10 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/471

[FLINK-1350] [runtime] Add blocking result partition variant

- **Renames** runtime intermediate result classes (after an offline 
discussion with @StephanEwen I realized that calling subpartitions *queue* was 
rather confusing):
  a) Removes Intermediate prefix
  b) Queue = Subpartition
  c) Iterator = View

  Documentation is coming up soon in FLINK-1373.

- **[FLINK-1350](https://issues.apache.org/jira/browse/FLINK-1350)**: Adds 
a *spillable result subpartition variant* for **BLOCKING results**, which 
writes data to memory first and starts to spill (asynchronously) if not enough 
memory is available to produce the result in-memory only.

  Receiving tasks of BLOCKING results are only deployed after *all* 
partitions have been fully produced. PIPELINED and BLOCKING results can not be 
mixed.

@StephanEwen, what is your opinion on these two points? We could also start 
deploying receivers of blocking results as soon as a single partition is 
finished and then send the update RPC calls later. The current solution is 
simpler and results in less RPC calls (single per parallel producer).

- **[FLINK-1359](https://issues.apache.org/jira/browse/FLINK-1359)**: Adds 
**simple state tracking** to result partitions with notifications after 
partitions/subpartitions have been consumed. In normal operation, each 
partition has to be consumed at least once before it can be released.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/incubator-flink 
blocking_results-flink-1350

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/471.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #471


commit cbe71d72378dc3a8546dd0322f8712e7352b7a08
Author: Ufuk Celebi u...@apache.org
Date:   2015-01-06T16:11:08Z

[FLINK-1350] [runtime] Add blocking result partition variant

- Renames runtime intermediate result classes:
  a) Removes Intermediate prefix
  b) Queue = Subpartition
  c) Iterator = View

- [FLINK-1350] Adds a spillable result subpartition variant for BLOCKING
  results, which writes data to memory first and starts to spill
  (asynchronously) if not enough memory is available to produce the
  result in-memory only.

  Receiving tasks of BLOCKING results are only deployed after *all*
  partitions have been fully produced. PIPELINED and BLOCKING results can 
not
  be mixed.

- [FLINK-1359] Adds simple state tracking to result partitions with
  notifications after partitions/subpartitions have been consumed. Each
  partition has to be consumed at least once before it can be released.

  Currently there is no notion of historic intermediate results, i.e. 
results
  are released as soon as they are consumed.

commit 878cf9cecf41877eb82aa3a34f3266c2d4760bcd
Author: Ufuk Celebi u...@apache.org
Date:   2015-03-09T14:29:06Z

[FLINK-1350] [runtime] Send final task state update only after 
unregistering task

commit 315d96ed93a2b52ab9f0ac6bdf6727f497488478
Author: Ufuk Celebi u...@apache.org
Date:   2015-03-09T15:17:25Z

[FLINK-1350] [runtime] Deploy receivers of blocking results only after all 
producers finished

commit 66f7c4c0286e6b380107d5b7a22568ffbb3a8011
Author: Ufuk Celebi u...@apache.org
Date:   2015-03-09T15:47:17Z

[runtime] Update Netty version to 4.0.26.Final

commit fccfeca84f9bc990f11139cce3cd381d3f04dbd4
Author: Ufuk Celebi u...@apache.org
Date:   2015-03-10T09:50:46Z

[tests] Increase number of network buffers in 
ProcessFailureBatchRecoveryITCase

This is necessary, because the network thread has an extra buffer pool to 
read
data into from spilled partitions.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---