[jira] [Updated] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-13 Thread Matthew Barlocker (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Barlocker updated FLINK-4617:
-
Affects Version/s: 1.0.1
   1.0.2
   1.0.3

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2016-09-13 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488555#comment-15488555
 ] 

Josep RubiĆ³ commented on FLINK-1707:


Hi [~vkalavri],

I'm not sure I understand what you mean. Will not the matrix need to be read 
anyway?

Would providing a Dataset of Tuple3 (id1, id2, similarity) and create the 
vertices and edges through transformations work? The function should check no 
duplicates exist and fill non existing similarities with 0 value.

> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep RubiĆ³
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-13 Thread Matthew Barlocker (JIRA)
Matthew Barlocker created FLINK-4617:


 Summary: Kafka & Flink duplicate messages on restart
 Key: FLINK-4617
 URL: https://issues.apache.org/jira/browse/FLINK-4617
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, State Backends, Checkpointing
Affects Versions: 1.1.2, 1.1.1, 1.1.0
 Environment: Ubuntu 16.04
Flink 1.1.*
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
Reporter: Matthew Barlocker
Priority: Critical


[StackOverflow 
Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]

Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was 
shut down.

*My code:*
{code}
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");

val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
"testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
  .addSink(kafkaProducer)

env.execute()
  }
}
{code}

*My sbt dependencies:*
{code}
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.1.2",
"org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
"org.apache.flink" %% "flink-clients" % "1.1.2",
"org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
"org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)
{code}

*My process:*
using 3 terminals:
{code}
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic
{code}

*My expectations:*

When there are no errors in the system, I expect to be able to turn flink on 
and off without reprocessing messages that successfully completed the stream in 
a prior run.

*My attempts to fix:*

I've added the call to setStateBackend, thinking that perhaps the default 
memory backend just didn't remember correctly. That didn't seem to help.

I've removed the call to enableCheckpointing, hoping that perhaps there was a 
separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
help.

I've used different sinks, RollingFileSink, print(); hoping that maybe the bug 
was in kafka. That didn't seem to help.

I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
maybe the bug was in the latest version. That didn't seem to help.

I've added the zookeeper.connect config to the properties object, hoping that 
the comment about it only being useful in 0.8 was wrong. That didn't seem to 
help.

I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea drfloob). 
That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4608) Use short-circuit AND in Max/Min AggregationFunction

2016-09-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488223#comment-15488223
 ] 

ASF GitHub Bot commented on FLINK-4608:
---

Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2489
  
@zentol Can you merge it?


> Use short-circuit AND in Max/Min AggregationFunction
> 
>
> Key: FLINK-4608
> URL: https://issues.apache.org/jira/browse/FLINK-4608
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> Max/Min AggregationFunction use & instead of &&. Usually we use short-circuit 
> logic in if operators in java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2489: [FLINK-4608] Use short-circuit AND in Max/Min Aggregation...

2016-09-13 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2489
  
@zentol Can you merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2488: [FLINK-4607] Close FileInputStream in ParameterTool and o...

2016-09-13 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2488
  
@zentol Time to merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4609) Remove redundant check for null in CrossOperator

2016-09-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488221#comment-15488221
 ] 

ASF GitHub Bot commented on FLINK-4609:
---

Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r78631478
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java 
---
@@ -129,14 +129,11 @@ private String getDefaultName() {
 
public DefaultCross(DataSet input1, DataSet input2, 
CrossHint hint, String defaultName) {

-   super(input1, input2, new DefaultCrossFunction(),
+   super(Preconditions.checkNotNull(input1, "input1 is 
null"),
--- End diff --

@greghogan 


> Remove redundant check for null in CrossOperator
> 
>
> Key: FLINK-4609
> URL: https://issues.apache.org/jira/browse/FLINK-4609
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> CrossOperator checks input1 and input2 for null after they were dereferenced



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4607) Close FileInputStream in ParameterTool and other

2016-09-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488225#comment-15488225
 ] 

ASF GitHub Bot commented on FLINK-4607:
---

Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2488
  
@zentol Time to merge it?


> Close FileInputStream in ParameterTool and other
> 
>
> Key: FLINK-4607
> URL: https://issues.apache.org/jira/browse/FLINK-4607
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.2
>Reporter: Alexander Pivovarov
>Priority: Trivial
>
> ParameterTool and some tests do not close FileInputStream
> {code}
> flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
> flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
> flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
> flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
> flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-13 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r78631478
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java 
---
@@ -129,14 +129,11 @@ private String getDefaultName() {
 
public DefaultCross(DataSet input1, DataSet input2, 
CrossHint hint, String defaultName) {

-   super(input1, input2, new DefaultCrossFunction(),
+   super(Preconditions.checkNotNull(input1, "input1 is 
null"),
--- End diff --

@greghogan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-13 Thread ramkrishna.s.vasudevan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ramkrishna.s.vasudevan reassigned FLINK-3322:
-

Assignee: ramkrishna.s.vasudevan

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3869) WindowedStream.apply with FoldFunction is too restrictive

2016-09-13 Thread Dan Bress (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15487509#comment-15487509
 ] 

Dan Bress commented on FLINK-3869:
--

I know this is slated for 2.0, but just wanted to drop a comment saying that I 
am running into an issue where this fix would be beneficial to me.

> WindowedStream.apply with FoldFunction is too restrictive
> -
>
> Key: FLINK-3869
> URL: https://issues.apache.org/jira/browse/FLINK-3869
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Right now we have this signature:
> {code}
> public  SingleOutputStreamOperator apply(R initialValue, 
> FoldFunction foldFunction, WindowFunction function) {
> {code}
> but we should have this signature to allow users to return a type other than 
> the fold accumulator type from their window function:
> {code}
> public  SingleOutputStreamOperator apply(ACC initialValue, 
> FoldFunction foldFunction, WindowFunction function) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2016-09-13 Thread Yuri Makhno (JIRA)
Yuri Makhno created FLINK-4616:
--

 Summary: Kafka consumer doesn't store last emmited watermarks per 
partition in state
 Key: FLINK-4616
 URL: https://issues.apache.org/jira/browse/FLINK-4616
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.1.1
Reporter: Yuri Makhno
 Fix For: 1.2.0, 1.1.3


Kafka consumers stores in state only kafka offsets and doesn't store last 
emmited watermarks, this may go to wrong state when checkpoint is restored:

Let's say our watermark is (timestamp - 10) and in case we have the following 
messages queue results will be different after checkpoint restore and during 
normal processing:

A(ts = 30)
B(ts = 35)
-- checkpoint goes here
C(ts=15) -- this one should be filtered by next time window
D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-13 Thread ramkrishna.s.vasudevan (JIRA)
ramkrishna.s.vasudevan created FLINK-4615:
-

 Summary: Reusing the memory allocated for the drivers and iterators
 Key: FLINK-4615
 URL: https://issues.apache.org/jira/browse/FLINK-4615
 Project: Flink
  Issue Type: Sub-task
Reporter: ramkrishna.s.vasudevan
Assignee: ramkrishna.s.vasudevan


Raising as a subtask so that individually can be committed and for better 
closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15487234#comment-15487234
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/2496

FLINK-4615 Reusing the memory allocated for the drivers and iterators

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

First PR for initial review. Helps to reuse the memory for the iterators 
that are created by the drivers. If some one could point me to more test cases 
other than the ConnectedComponents example can see how things work. 
Suggestions/feedback are welcome. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-4615

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2496.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2496


commit b4440ed2a4cd7484ae90562a49a5af0336b41902
Author: Ramkrishna 
Date:   2016-09-13T13:39:58Z

FLINK-4615 Reusing the memory allocated for the drivers and iterators




> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-13 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/2496

FLINK-4615 Reusing the memory allocated for the drivers and iterators

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

First PR for initial review. Helps to reuse the memory for the iterators 
that are created by the drivers. If some one could point me to more test cases 
other than the ConnectedComponents example can see how things work. 
Suggestions/feedback are welcome. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-4615

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2496.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2496


commit b4440ed2a4cd7484ae90562a49a5af0336b41902
Author: Ramkrishna 
Date:   2016-09-13T13:39:58Z

FLINK-4615 Reusing the memory allocated for the drivers and iterators




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-13 Thread ramkrishna.s.vasudevan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ramkrishna.s.vasudevan updated FLINK-3322:
--
Attachment: FLINK-3322_reusingmemoryfordrivers.docx

Attaching a doc with two approaches for reusing the memory for the drivers. 

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-13 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15487192#comment-15487192
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

[~ggevay]
I think I was able to make the driver related changes as you suggested but with 
slight modifications. I will update a doc for that and also another PR with 
only the driver related changes. If needed I can combine the already submitted 
PR and the new one.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2495: FLINK-3322 - Make sorters to reuse the memory page...

2016-09-13 Thread ramkrish86
GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/2495

FLINK-3322 - Make sorters to reuse the memory pages allocated for iterative 
tasks

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


This is part1 for FLINK-3322 where only the Sorters are made to reuse the 
memory pages. As @ggevay  pointed out we have to handle the iterators also 
where the memory pages are allocated. I have a seperate PR for that because 
that involves touching lot of places. But am open to feedback here. It is fine 
with me to combine both also but it was making the changes much bigger. 
I would like to get the feed back here on this apporach. 
Here a SorterMemoryAllocator is now passed to the UnilateralSortMergers. 
That will allocate the required memory pages and it will allocate the required 
read, write and large buffers. As per the existing logic the buffers will be 
released. But if the task is an iterative task we wait for the tasks to be 
released until a close or termination call happens for the iterative task. 
In case of pages that were grabbed in between for keysort or record sort 
those will be put back to the respective pages so that we have the required 
number of pages through out the life cycle of the iterative task.

As said this is only part 1. We need to address the iterators also. But 
that according to me touches more places. I have done the changes for that but 
it is not in a shape to be pushed as a PR but am open to feed back here. Thanks 
all. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3322_part1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2495


commit 705ee5294bc5263971c2924a55c9230d72806527
Author: Ramkrishna 
Date:   2016-09-13T06:33:59Z

FLINK-3322 - Make sorters to reuse the memory pages allocated for
iterative tasks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15486449#comment-15486449
 ] 

ASF GitHub Bot commented on FLINK-3322:
---

GitHub user ramkrish86 opened a pull request:

https://github.com/apache/flink/pull/2495

FLINK-3322 - Make sorters to reuse the memory pages allocated for iterative 
tasks

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


This is part1 for FLINK-3322 where only the Sorters are made to reuse the 
memory pages. As @ggevay  pointed out we have to handle the iterators also 
where the memory pages are allocated. I have a seperate PR for that because 
that involves touching lot of places. But am open to feedback here. It is fine 
with me to combine both also but it was making the changes much bigger. 
I would like to get the feed back here on this apporach. 
Here a SorterMemoryAllocator is now passed to the UnilateralSortMergers. 
That will allocate the required memory pages and it will allocate the required 
read, write and large buffers. As per the existing logic the buffers will be 
released. But if the task is an iterative task we wait for the tasks to be 
released until a close or termination call happens for the iterative task. 
In case of pages that were grabbed in between for keysort or record sort 
those will be put back to the respective pages so that we have the required 
number of pages through out the life cycle of the iterative task.

As said this is only part 1. We need to address the iterators also. But 
that according to me touches more places. I have done the changes for that but 
it is not in a shape to be pushed as a PR but am open to feed back here. Thanks 
all. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ramkrish86/flink FLINK-3322_part1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2495


commit 705ee5294bc5263971c2924a55c9230d72806527
Author: Ramkrishna 
Date:   2016-09-13T06:33:59Z

FLINK-3322 - Make sorters to reuse the memory pages allocated for
iterative tasks




> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA