[jira] [Updated] (FLINK-4627) Use Flink's PropertiesUtil in Kinesis connector to extract typed values from config properties

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4627:
---
Description: 
Right now value extraction from config properties in the Kinesis connector is 
using the plain methods from {{java.util.Properties}} with string parsing.

We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines of 
and more readable code.

  was:
Right now value extraction from config properties in the Kinesis connector is 
using the plain methods from {{java.util.Properties}} with type casting.

We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines of 
and more readable code.


> Use Flink's PropertiesUtil in Kinesis connector to extract typed values from 
> config properties 
> ---
>
> Key: FLINK-4627
> URL: https://issues.apache.org/jira/browse/FLINK-4627
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Trivial
> Fix For: 1.2.0
>
>
> Right now value extraction from config properties in the Kinesis connector is 
> using the plain methods from {{java.util.Properties}} with string parsing.
> We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines 
> of and more readable code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4627) Use Flink's PropertiesUtil in Kinesis connector to extract typed values from config properties

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4627:
--

 Summary: Use Flink's PropertiesUtil in Kinesis connector to 
extract typed values from config properties 
 Key: FLINK-4627
 URL: https://issues.apache.org/jira/browse/FLINK-4627
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai
Priority: Trivial
 Fix For: 1.2.0


Right now value extraction from config properties in the Kinesis connector is 
using the plain methods from {{java.util.Properties}} with type casting.

We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines of 
and more readable code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15495068#comment-15495068
 ] 

ASF GitHub Bot commented on FLINK-4561:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2459
  
@StephanEwen Thank for your reply, understand it.


> replace all the scala version as a `scala.binary.version` property
> --
>
> Key: FLINK-4561
> URL: https://issues.apache.org/jira/browse/FLINK-4561
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> replace all the scala version(2.10) as a property `scala.binary.version` 
> defined in root pom properties. default scala version property is 2.10.
> modify:
> 1. dependency include scala version 
> 2. module defining include scala version
> 3. scala version upgrade to 2.11.8 from 2.11.7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...

2016-09-15 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2459
  
@StephanEwen Thank for your reply, understand it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4626) Missing break in MetricStore#add()

2016-09-15 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4626:
-

 Summary: Missing break in MetricStore#add()
 Key: FLINK-4626
 URL: https://issues.apache.org/jira/browse/FLINK-4626
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


{code}
  switch (info.getCategory()) {
case INFO_CATEGORY_JM:
  addMetric(jobManager.metrics, name, metric);
case INFO_CATEGORY_TM:
  String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) 
info).taskManagerID;
{code}
Looks like a break is missing following addMetric(jobManager.metrics) call



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79040795
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

For example, the various combine drivers seem to be easy to make 
resettable. Maybe this PR should just do that, so we maximize the chances that 
we can get the PR to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4515) org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible with 1.1 released version

2016-09-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4515:
-
Component/s: JobManager

> org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible 
> with 1.1 released version
> ---
>
> Key: FLINK-4515
> URL: https://issues.apache.org/jira/browse/FLINK-4515
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Nagarjun Guraja
>
> Current org.apache.flink.runtime.jobmanager.JobInfo in the 1.2 trunk is not 
> backwards compatible which breaks job recorvery while upgrading to latest 
> flink build from 1.1 release
> 2016-08-26 13:39:56,618 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Attempting to recover all jobs.
> 2016-08-26 13:39:57,225 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Fatal error: Failed to recover jobs.
> java.io.InvalidClassException: org.apache.flink.runtime.jobmanager.JobInfo; 
> local class incompatible: stream classdesc serialVersionUID = 
> 4102282956967236682, local class serialVersionUID = -2377916285980374169
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at 
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58)
> at 
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:543)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:539)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494273#comment-15494273
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79040795
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

For example, the various combine drivers seem to be easy to make 
resettable. Maybe this PR should just do that, so we maximize the chances that 
we can get the PR to be merged.


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4515) org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible with 1.1 released version

2016-09-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4515:
-
Affects Version/s: 1.2.0

> org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible 
> with 1.1 released version
> ---
>
> Key: FLINK-4515
> URL: https://issues.apache.org/jira/browse/FLINK-4515
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0
>Reporter: Nagarjun Guraja
>
> Current org.apache.flink.runtime.jobmanager.JobInfo in the 1.2 trunk is not 
> backwards compatible which breaks job recorvery while upgrading to latest 
> flink build from 1.1 release
> 2016-08-26 13:39:56,618 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Attempting to recover all jobs.
> 2016-08-26 13:39:57,225 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Fatal error: Failed to recover jobs.
> java.io.InvalidClassException: org.apache.flink.runtime.jobmanager.JobInfo; 
> local class incompatible: stream classdesc serialVersionUID = 
> 4102282956967236682, local class serialVersionUID = -2377916285980374169
> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at 
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58)
> at 
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:543)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:539)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494252#comment-15494252
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79038567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

Currently, the `ResettableDriver` interface extends the `Driver` interface. 
I would say that for the moment, let's just start by making some of the drivers 
resettable in this PR, and keep both interfaces around. And then when this PR 
is merged, a second PR could make all the remaing drivers resettable as well. 
And then a third one could merge the functionality of `ResettableDriver` into 
`Driver`, and thus get rid of `ResettableDriver`.


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79038567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

Currently, the `ResettableDriver` interface extends the `Driver` interface. 
I would say that for the moment, let's just start by making some of the drivers 
resettable in this PR, and keep both interfaces around. And then when this PR 
is merged, a second PR could make all the remaing drivers resettable as well. 
And then a third one could merge the functionality of `ResettableDriver` into 
`Driver`, and thus get rid of `ResettableDriver`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4625) Guard Flink processes against blocking shutdown hooks

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494110#comment-15494110
 ] 

ASF GitHub Bot commented on FLINK-4625:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2503

[FLINK-4625] [core] Add a safety net to forcibly terminate JVM if clean 
shutdown freezed.

Resource managers like YARN send the JVM the `SIGTERM` signal to kill the 
process.

With `SIGTERM`, the JVM shutdown hooks run, and may cause the process to 
freeze up during shutdown. Especially since all dependencies (like Hadoop) may 
install shutdown hooks (and do so), it is not in Flink's control to make sure 
all shutdown hooks are well behaved and never lock the JVM shutdown.

This pull requests adds a shutdown hook that calls `Runtime.halt()` after a 
delay. This forcibly terminates the JVM if clean shutdown does not succeed 
within a certain time (default is five seconds).

The pull request also adds tests that validate the behavior of JVM shutdown 
lockups and that the safety net ensures the process really shuts down.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink process_self_kill

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2503.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2503


commit d5b9860773ec7aaf0b238544b794a10012d8dda5
Author: Stephan Ewen 
Date:   2016-09-15T17:27:06Z

[FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean 
shutdown freezed.




> Guard Flink processes against blocking shutdown hooks
> -
>
> Key: FLINK-4625
> URL: https://issues.apache.org/jira/browse/FLINK-4625
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Resource managers like YARN send the JVM the {{SIGTERM}} signal to kill the 
> process, if it wants to terminate a process.
> With {{SIGTERM}}, the JVM shutdown hooks run, and may cause the process to 
> freeze up on shutdown. Especially since all dependencies (like Hadoop) may 
> install shutdown hooks (and do so), it is not in Flink's control to make sure 
> all Shutdown hooks are well behaved.
> I propose to add a guard that forcibly terminates the JVM if clean shutdown 
> does not succeed within a certain time (say five seconds).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2503: [FLINK-4625] [core] Add a safety net to forcibly t...

2016-09-15 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2503

[FLINK-4625] [core] Add a safety net to forcibly terminate JVM if clean 
shutdown freezed.

Resource managers like YARN send the JVM the `SIGTERM` signal to kill the 
process.

With `SIGTERM`, the JVM shutdown hooks run, and may cause the process to 
freeze up during shutdown. Especially since all dependencies (like Hadoop) may 
install shutdown hooks (and do so), it is not in Flink's control to make sure 
all shutdown hooks are well behaved and never lock the JVM shutdown.

This pull requests adds a shutdown hook that calls `Runtime.halt()` after a 
delay. This forcibly terminates the JVM if clean shutdown does not succeed 
within a certain time (default is five seconds).

The pull request also adds tests that validate the behavior of JVM shutdown 
lockups and that the safety net ensures the process really shuts down.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink process_self_kill

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2503.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2503


commit d5b9860773ec7aaf0b238544b794a10012d8dda5
Author: Stephan Ewen 
Date:   2016-09-15T17:27:06Z

[FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean 
shutdown freezed.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2498: [FLINK-4619] - JobManager does not answer to client when ...

2016-09-15 Thread mproch
Github user mproch commented on the issue:

https://github.com/apache/flink/pull/2498
  
Yes, it's a bit more tricky then I expected... These tests probably also 
need to be changed, because they relied on previous behaviour... What's more 
with the change some checks on FlinkMiniCluster start to be 
non-deterministic... I'll commit fixes when I'll fully understand how it 
works...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4619) JobManager does not answer to client when restore from savepoint fails

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494099#comment-15494099
 ] 

ASF GitHub Bot commented on FLINK-4619:
---

Github user mproch commented on the issue:

https://github.com/apache/flink/pull/2498
  
Yes, it's a bit more tricky then I expected... These tests probably also 
need to be changed, because they relied on previous behaviour... What's more 
with the change some checks on FlinkMiniCluster start to be 
non-deterministic... I'll commit fixes when I'll fully understand how it 
works...


> JobManager does not answer to client when restore from savepoint fails
> --
>
> Key: FLINK-4619
> URL: https://issues.apache.org/jira/browse/FLINK-4619
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: Maciej Prochniak
> Fix For: 1.2.0, 1.1.3
>
>
> When savepoint used is incompatible with currently deployed process, the job 
> manager never returns (jobInfo.notifyClients is not invoked in one of 
> try-catch blocks)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4619) JobManager does not answer to client when restore from savepoint fails

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494086#comment-15494086
 ] 

ASF GitHub Bot commented on FLINK-4619:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2498
  
Good idea. Unfortunately, the changed broke some tests:

  - `SavepointITCase.testSubmitWithUnknownSavepointPath`
  - `RescalingITCase.testSavepointRescalingFailureWithNonPartitionedState`

You can see more in the Travis CI report.


> JobManager does not answer to client when restore from savepoint fails
> --
>
> Key: FLINK-4619
> URL: https://issues.apache.org/jira/browse/FLINK-4619
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: Maciej Prochniak
> Fix For: 1.2.0, 1.1.3
>
>
> When savepoint used is incompatible with currently deployed process, the job 
> manager never returns (jobInfo.notifyClients is not invoked in one of 
> try-catch blocks)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2498: [FLINK-4619] - JobManager does not answer to client when ...

2016-09-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2498
  
Good idea. Unfortunately, the changed broke some tests:

  - `SavepointITCase.testSubmitWithUnknownSavepointPath`
  - `RescalingITCase.testSavepointRescalingFailureWithNonPartitionedState`

You can see more in the Travis CI report.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494054#comment-15494054
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79021152
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

So can we only have ResettableDriver only and no need for Driver interface 
then?


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79021152
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

So can we only have ResettableDriver only and no need for Driver interface 
then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494047#comment-15494047
 ] 

ASF GitHub Bot commented on FLINK-4485:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2499
  
Looks food to me.

+1 to merge


> Finished jobs in yarn session fill /tmp filesystem
> --
>
> Key: FLINK-4485
> URL: https://issues.apache.org/jira/browse/FLINK-4485
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Niels Basjes
>Assignee: Maximilian Michels
>Priority: Blocker
>
> On a Yarn cluster I start a yarn-session with a few containers and task slots.
> Then I fire a 'large' number of Flink batch jobs in sequence against this 
> yarn session. It is the exact same job (java code) yet it gets different 
> parameters.
> In this scenario it is exporting HBase tables to files in HDFS and the 
> parameters are about which data from which tables and the name of the target 
> directory.
> After running several dozen jobs the jobs submission started to fail and we 
> investigated.
> We found that the cause was that on the Yarn node which was hosting the 
> jobmanager the /tmp file system was full (4GB was 100% full).
> How ever the output of {{du -hcs /tmp}} showed only 200MB in use.
> We found that a very large file (we guess it is the jar of the job) was put 
> in /tmp , used, deleted yet the file handle was not closed by the jobmanager.
> As soon as we killed the jobmanager the disk space was freed.
> The summary of the impact of this is that a yarn-session that receives enough 
> jobs brings down the Yarn node for all users.
> See parts of the output we got from {{lsof}} below.
> {code}
> COMMAND PID  USER   FD  TYPE DEVICE  SIZE   
> NODE NAME
> java  15034   nbasjes  550r  REG 253,17  66219695
> 245 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 
> (deleted)
> java  15034   nbasjes  551r  REG 253,17  66219695
> 252 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 
> (deleted)
> java  15034   nbasjes  552r  REG 253,17  66219695
> 267 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 
> (deleted)
> java  15034   nbasjes  553r  REG 253,17  66219695
> 250 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 
> (deleted)
> java  15034   nbasjes  554r  REG 253,17  66219695
> 288 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 
> (deleted)
> java  15034   nbasjes  555r  REG 253,17  66219695
> 298 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 
> (deleted)
> java  15034   nbasjes  557r  REG 253,17  66219695
> 254 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 
> (deleted)
> java  15034   nbasjes  558r  REG 253,17  66219695
> 292 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 
> (deleted)
> java  15034   nbasjes  559r  REG 253,17  66219695
> 275 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 
> (deleted)
> java  15034   nbasjes  560r  REG 253,17  66219695
> 159 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 
> (deleted)
> java  15034   nbasjes  562r  REG 253,17  66219695
> 238 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 
> (deleted)
> java  15034   nbasjes  568r  REG 253,17  66219695
> 246 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 
> (deleted)
> java  15034   nbasjes  569r  REG 253,17  66219695
> 255 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 
> (deleted)
> java  15034   nbasjes  571r  REG 253,17  66219695
> 299 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 
> (deleted)
> java  15034   nbasjes  572r  REG 253,17  66219695
> 293 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 
> (deleted)
> java  15034   nbasjes  574r  REG 253,17  66219695
> 256 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 
> (deleted)
> java  15034   nbasjes  575r  REG 253,17  66219695
> 302 
> 

[GitHub] flink issue #2499: [FLINK-4485] close and remove user class loader after job...

2016-09-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2499
  
Looks food to me.

+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2501: [FLINK-4622] CLI help message should include 'savepoint' ...

2016-09-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2501
  
Thanks for the correction.

+1 to merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4622) CLI help message should include 'savepoint' action

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494044#comment-15494044
 ] 

ASF GitHub Bot commented on FLINK-4622:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2501
  
Thanks for the correction.

+1 to merge this!


> CLI help message should include 'savepoint' action
> --
>
> Key: FLINK-4622
> URL: https://issues.apache.org/jira/browse/FLINK-4622
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.1.2
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>Priority: Trivial
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The Flink CLI help message should include the 'savepoint' action in the list 
> of available actions. It currently looks like:
> {code}
> bash-4.3# flink foo
> "foo" is not a valid action.
> Valid actions are "run", "list", "info", "stop", or "cancel".
> Specify the version option (-v or --version) to print Flink version.
> Specify the help option (-h or --help) to get help on the command.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4625) Guard Flink processes against blocking shutdown hooks

2016-09-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4625:
---

 Summary: Guard Flink processes against blocking shutdown hooks
 Key: FLINK-4625
 URL: https://issues.apache.org/jira/browse/FLINK-4625
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.1.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0


Resource managers like YARN send the JVM the {{SIGTERM}} signal to kill the 
process, if it wants to terminate a process.

With {{SIGTERM}}, the JVM shutdown hooks run, and may cause the process to 
freeze up on shutdown. Especially since all dependencies (like Hadoop) may 
install shutdown hooks (and do so), it is not in Flink's control to make sure 
all Shutdown hooks are well behaved.

I propose to add a guard that forcibly terminates the JVM if clean shutdown 
does not succeed within a certain time (say five seconds).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494022#comment-15494022
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79017956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

Ok. Let me update the code accordingly. Of course it is simple then. My 
only concern was I was not sure whether every driver can be made of type 
ResettableDriver when it was a special case for certain drivers. 


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79017956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

Ok. Let me update the code accordingly. Of course it is simple then. My 
only concern was I was not sure whether every driver can be made of type 
ResettableDriver when it was a special case for certain drivers. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4389) Expose metrics to Webfrontend

2016-09-15 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4389.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Implemented in 70704de0c82cbb7b143dd696221e11999feb3600

> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.2.0, pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494002#comment-15494002
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2363


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache, 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2363


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4589) Fix Merging of Covering Window in MergingWindowSet

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493989#comment-15493989
 ] 

ASF GitHub Bot commented on FLINK-4589:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2476
  
Looks good to me, merging this...


> Fix Merging of Covering Window in MergingWindowSet
> --
>
> Key: FLINK-4589
> URL: https://issues.apache.org/jira/browse/FLINK-4589
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.0.4, 1.2.0, 1.1.3
>
>
> Right now, when a new window gets merged that covers all of the existing 
> window {{MergingWindowSet}} does not correctly set the state window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2476: [FLINK-4589] Fix Merging of Covering Window in MergingWin...

2016-09-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2476
  
Looks good to me, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-15 Thread Matthew Barlocker (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493956#comment-15493956
 ] 

Matthew Barlocker commented on FLINK-4618:
--

Nice find [~melmoth] and [~tzulitai] - I don't know the code well enough to 
feel confident making the change or testing the fix. Please feel free. I 
started learning Flink about a week ago, and have put a solid 3 hours into it. 

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
> Fix For: 1.2.0, 1.1.3
>
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  

[jira] [Comment Edited] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493927#comment-15493927
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4618 at 9/15/16 5:04 PM:
-

I've confirmed that the problem only exists in the 0.9 consumer.
[~melmoth] and [~mbarlocker], since you were the original reporters of the bug, 
would any one of you want to work on fixing it and open a PR? You can ping me 
to help review when you're ready :)
Otherwise I can also pick it up. In any case, let me know ;)


was (Author: tzulitai):
I've confirmed that the problem only exists in the 0.9 consumer.
[~melmoth] and [~mbarlocker], since you were the original reporters of the bug, 
would you want to work on fixing it and open a PR? You can ping me to help 
review when you're ready :)
Otherwise I can also pick it up. In any case, let me know ;)

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
> Fix For: 1.2.0, 1.1.3
>
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, 

[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493950#comment-15493950
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4617:


Revisited and reconfirmed that this is actually a bug in the 0.9 consumer 
(sorry for mis-concluding in the first place). Please see FLINK-4618.

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4624) Gelly's summarization algorithm cannot deal with null vertex group values

2016-09-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4624:


 Summary: Gelly's summarization algorithm cannot deal with null 
vertex group values
 Key: FLINK-4624
 URL: https://issues.apache.org/jira/browse/FLINK-4624
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Till Rohrmann
 Fix For: 1.2.0


Gelly's {{Summarization}} algorithm cannot handle null values in the 
`VertexGroupItem.f2`. This behaviour is hidden by using Strings as a vertex 
value in the {{SummarizationITCase}}, because the {{StringSerializer}} can 
handle null values. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4618:
---
Fix Version/s: 1.1.3
   1.2.0

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
> Fix For: 1.2.0, 1.1.3
>
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Stopped BLOB server at 0.0.0.0:2946
> -- Restarting job
> 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating 

[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493927#comment-15493927
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4618:


I've confirmed that the problem only exists in the 0.9 consumer.
[~melmoth] and [~mbarlocker], since you were the original reporters of the bug, 
would you want to work on fixing it and open a PR? You can ping me to help 
review when you're ready :)
Otherwise I can also pick it up. In any case, let me know ;)

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition 

[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record on startup from Kafka offsets

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4618:
---
Summary: Kafka Consumer 0.9 should start from the next record on startup 
from Kafka offsets  (was: Kafka Consumer 0.9 should start from the next record 
when startup from Kafka offsets)

> Kafka Consumer 0.9 should start from the next record on startup from Kafka 
> offsets
> --
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Stopped BLOB server at 0.0.0.0:2946
> -- Restarting job
> 

[jira] [Updated] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4618:
---
Summary: FlinkKafkaConsumer09 should start from the next record on startup 
from offsets in Kafka  (was: Kafka Consumer 0.9 should start from the next 
record on startup from offsets in Kafka)

> FlinkKafkaConsumer09 should start from the next record on startup from 
> offsets in Kafka
> ---
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Stopped BLOB server at 0.0.0.0:2946
> -- 

[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record on startup from offsets in Kafka

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4618:
---
Summary: Kafka Consumer 0.9 should start from the next record on startup 
from offsets in Kafka  (was: Kafka Consumer 0.9 should start from the next 
record on startup from Kafka offsets)

> Kafka Consumer 0.9 should start from the next record on startup from offsets 
> in Kafka
> -
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
>
> **Original reported ticket title: Last kafka message gets consumed twice when 
> restarting job**
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Stopped BLOB server at 0.0.0.0:2946
> -- Restarting job

[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record when startup from Kafka offsets

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4618:
---
Description: 
**Original reported ticket title: Last kafka message gets consumed twice when 
restarting job**

There seem to be an issue with the offset management in Flink. When a job is 
stopped and startet again, a message from the previous offset is read again.
I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
consumer group and emitted one record.

You can cleary see, that the consumer waits for a new record at offset 4848911, 
which is correct. After restarting, it consumes a record at 4848910, causing 
the record to be consumed more than once.

I checked the offset with the Kafka CMD tools, the commited offset in zookeeper 
is 4848910.

Here is my log output:

{code}
10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient   
 - Initiating connection to node 2147482646 at hdp1:6667.
10:29:24,225 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
committed offsets for partitions: [myTopic-0]
10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient   
 - Completed connection to node 2147482646
10:29:24,234 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No committed 
offset for partition myTopic-0
10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  
 - Resetting offset for partition myTopic-0 to latest offset.
10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  
 - Fetched offset 4848910 for partition myTopic-0
10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848910
10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848910
10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848910

-- Inserting a new event here

10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Adding fetched record for partition myTopic-0 with offset 4848910 to 
buffered record list
10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Returning fetched records at offset 4848910 for assigned partition myTopic-0 
and update position to 4848911
10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848911
10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
 - Triggering checkpoint 6 @ 1473841823887
10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848911
10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
 - Completed checkpoint 6 (in 96 ms)
10:30:24,196 TRACE 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
metadata=''}} to Node(2147482646, hdp1, 6667)
10:30:24,204 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
offset 4848910 for partition myTopic-0
10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848911
10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848911
10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer 
 - Stopped BLOB server at 0.0.0.0:2946

-- Restarting job

10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient   
 - Initiating connection to node 2147482646 at hdp1:6667.
10:32:01,673 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
committed offsets for partitions: [myTopic-0]
10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient   
 - Completed connection to node 2147482646
// See below! Shouldn't the offset be 4848911?
10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  
 - Resetting offset for partition myTopic-0 to the committed offset 4848910
10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher  
 - Added fetch request for partition myTopic-0 at offset 4848910
10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient  

[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record when startup from Kafka offsets

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4618:
---
Summary: Kafka Consumer 0.9 should start from the next record when startup 
from Kafka offsets  (was: Last kafka message gets consumed twice when 
restarting job)

> Kafka Consumer 0.9 should start from the next record when startup from Kafka 
> offsets
> 
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
>
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Stopped BLOB server at 0.0.0.0:2946
> -- Restarting job
> 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 

[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493897#comment-15493897
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79010283
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

> You mean all drivers will now become a type of ResettableDriver?

Yes, this was the idea in my comment [1] on the Jira. I'm sorry if I wasn't 
clear enough there.
The advantage of this is that you wouldn't need to add more logic in 
`BatchTask`, `AbstractIterativeTask` and its descendants, because these already 
handle the situation when the driver is an instance of `ResettableDriver`. (See 
e.g. the beginning of `reinstantiateDriver`.)

[1] 
https://issues.apache.org/jira/browse/FLINK-3322?focusedCommentId=15474531=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15474531


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-4618) Last kafka message gets consumed twice when restarting job

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reopened FLINK-4618:


> Last kafka message gets consumed twice when restarting job
> --
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
>
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:48,057 INFO  org.apache.flink.runtime.blob.BlobServer   
>- Stopped BLOB server at 0.0.0.0:2946
> -- Restarting job
> 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:32:01,673 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:32:01,677 DEBUG 

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79010283
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

> You mean all drivers will now become a type of ResettableDriver?

Yes, this was the idea in my comment [1] on the Jira. I'm sorry if I wasn't 
clear enough there.
The advantage of this is that you wouldn't need to add more logic in 
`BatchTask`, `AbstractIterativeTask` and its descendants, because these already 
handle the situation when the driver is an instance of `ResettableDriver`. (See 
e.g. the beginning of `reinstantiateDriver`.)

[1] 
https://issues.apache.org/jira/browse/FLINK-3322?focusedCommentId=15474531=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15474531


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493858#comment-15493858
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79007600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

I saw that code. But am not sure how to do that. Driver is actually 
instantiated from the config. So how to ensure that it is of type 
ResettableDriver? You mean all drivers will now become a type of 
ResettableDriver? 


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2495: FLINK-3322 - Make sorters to reuse the memory pages alloc...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2495
  
@ggevay and @StephanEwen  - any feedback/reviews here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493859#comment-15493859
 ] 

ASF GitHub Bot commented on FLINK-3322:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2495
  
@ggevay and @StephanEwen  - any feedback/reviews here.


> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79007600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

I saw that code. But am not sure how to do that. Driver is actually 
instantiated from the config. So how to ensure that it is of type 
ResettableDriver? You mean all drivers will now become a type of 
ResettableDriver? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4618) Last kafka message gets consumed twice when restarting job

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493845#comment-15493845
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4618:


Hi [~melmoth],
I've revisited this issue, and I think this may actually be a bug.
Like what you mentioned in SO, the Kafka consumer is either committing 1 less 
offset back to ZK, or that it's supposed to start from the next record when 
starting from offsets found in ZK.
Note that this still has nothing to do with Flink's exactly-once guarantee 
though; this is simply that the consumer isn't starting at the right place when 
starting with offsets found from ZK. The savepoint / exactly-once descriptions 
I described before still holds. I'm sorry for making a wrong conclusion on this 
bug in the first place.

I'll update information in this ticket once I confirm the bug! 

> Last kafka message gets consumed twice when restarting job
> --
>
> Key: FLINK-4618
> URL: https://issues.apache.org/jira/browse/FLINK-4618
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.2
> Environment: Flink 1.1.2
> Kafka Broker 0.10.0
> Hadoop 2.7.0
>Reporter: Melmoth
>
> There seem to be an issue with the offset management in Flink. When a job is 
> stopped and startet again, a message from the previous offset is read again.
> I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new 
> consumer group and emitted one record.
> You can cleary see, that the consumer waits for a new record at offset 
> 4848911, which is correct. After restarting, it consumes a record at 4848910, 
> causing the record to be consumed more than once.
> I checked the offset with the Kafka CMD tools, the commited offset in 
> zookeeper is 4848910.
> Here is my log output:
> {code}
> 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient 
>- Initiating connection to node 2147482646 at hdp1:6667.
> 10:29:24,225 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Fetching 
> committed offsets for partitions: [myTopic-0]
> 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient 
>- Completed connection to node 2147482646
> 10:29:24,234 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - No 
> committed offset for partition myTopic-0
> 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Resetting offset for partition myTopic-0 to latest offset.
> 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher
>- Fetched offset 4848910 for partition myTopic-0
> 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848910
> -- Inserting a new event here
> 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Adding fetched record for partition myTopic-0 with offset 4848910 to 
> buffered record list
> 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Returning fetched records at offset 4848910 for assigned partition 
> myTopic-0 and update position to 4848911
> 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,887 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Triggering checkpoint 6 @ 1473841823887
> 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
>- Added fetch request for partition myTopic-0 at offset 4848911
> 10:30:23,996 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>- Completed checkpoint 6 (in 96 ms)
> 10:30:24,196 TRACE 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
> offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, 
> metadata=''}} to Node(2147482646, hdp1, 6667)
> 10:30:24,204 DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Committed 
> offset 4848910 for partition myTopic-0
> 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 

[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493841#comment-15493841
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79006658
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
 ---
@@ -82,43 +82,46 @@ protected void initialize() throws Exception {
public void run() throws Exception {

SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
+   try {
+   while (this.running && !terminationRequested()) {
 
-   while (this.running && !terminationRequested()) {
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
-   }
+   super.run();
 
-   super.run();
+   // check if termination was requested
+   verifyEndOfSuperstepState();
 
-   // check if termination was requested
-   verifyEndOfSuperstepState();
+   if (isWorksetUpdate && isWorksetIteration) {
+   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
+   
worksetAggregator.aggregate(numCollected);
+   }
 
-   if (isWorksetUpdate && isWorksetIteration) {
-   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
-   worksetAggregator.aggregate(numCollected);
-   }
-   
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
-   }
-   
-   // let the successors know that the end of this 
superstep data is reached
-   sendEndOfSuperstep();
-   
-   if (isWorksetUpdate) {
-   // notify iteration head if responsible for 
workset update
-   worksetBackChannel.notifyOfEndOfSuperstep();
-   }
-   
-   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (terminated) {
-   requestTermination();
-   }
-   else {
-   incrementIterationCounter();
+   // let the successors know that the end of this 
superstep data is reached
+   sendEndOfSuperstep();
+
+   if (isWorksetUpdate) {
+   // notify iteration head if responsible 
for workset update
+   
worksetBackChannel.notifyOfEndOfSuperstep();
+   }
+
+   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+   if (terminated) {
+   requestTermination();
+   this.driver.cleanup();
--- End diff --

Am very sorry. That was a mistake. An oversight. I forgot to remove the 
cleanup() called after termination.


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA

[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79006658
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
 ---
@@ -82,43 +82,46 @@ protected void initialize() throws Exception {
public void run() throws Exception {

SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
+   try {
+   while (this.running && !terminationRequested()) {
 
-   while (this.running && !terminationRequested()) {
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
-   }
+   super.run();
 
-   super.run();
+   // check if termination was requested
+   verifyEndOfSuperstepState();
 
-   // check if termination was requested
-   verifyEndOfSuperstepState();
+   if (isWorksetUpdate && isWorksetIteration) {
+   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
+   
worksetAggregator.aggregate(numCollected);
+   }
 
-   if (isWorksetUpdate && isWorksetIteration) {
-   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
-   worksetAggregator.aggregate(numCollected);
-   }
-   
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
-   }
-   
-   // let the successors know that the end of this 
superstep data is reached
-   sendEndOfSuperstep();
-   
-   if (isWorksetUpdate) {
-   // notify iteration head if responsible for 
workset update
-   worksetBackChannel.notifyOfEndOfSuperstep();
-   }
-   
-   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (terminated) {
-   requestTermination();
-   }
-   else {
-   incrementIterationCounter();
+   // let the successors know that the end of this 
superstep data is reached
+   sendEndOfSuperstep();
+
+   if (isWorksetUpdate) {
+   // notify iteration head if responsible 
for workset update
+   
worksetBackChannel.notifyOfEndOfSuperstep();
+   }
+
+   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+   if (terminated) {
+   requestTermination();
+   this.driver.cleanup();
--- End diff --

Am very sorry. That was a mistake. An oversight. I forgot to remove the 
cleanup() called after termination.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493835#comment-15493835
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79005879
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

I think you should use the already existing `ResettableDriver` interface 
instead of introducing a new resetting method.


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79005879
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java ---
@@ -87,4 +87,6 @@
 * @throws Exception Exceptions may be forwarded.
 */
void cancel() throws Exception;
+
+   void resetForIterativeTasks() throws Exception;
--- End diff --

I think you should use the already existing `ResettableDriver` interface 
instead of introducing a new resetting method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...

2016-09-15 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79003843
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
 ---
@@ -82,43 +82,46 @@ protected void initialize() throws Exception {
public void run() throws Exception {

SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
+   try {
+   while (this.running && !terminationRequested()) {
 
-   while (this.running && !terminationRequested()) {
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
-   }
+   super.run();
 
-   super.run();
+   // check if termination was requested
+   verifyEndOfSuperstepState();
 
-   // check if termination was requested
-   verifyEndOfSuperstepState();
+   if (isWorksetUpdate && isWorksetIteration) {
+   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
+   
worksetAggregator.aggregate(numCollected);
+   }
 
-   if (isWorksetUpdate && isWorksetIteration) {
-   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
-   worksetAggregator.aggregate(numCollected);
-   }
-   
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
-   }
-   
-   // let the successors know that the end of this 
superstep data is reached
-   sendEndOfSuperstep();
-   
-   if (isWorksetUpdate) {
-   // notify iteration head if responsible for 
workset update
-   worksetBackChannel.notifyOfEndOfSuperstep();
-   }
-   
-   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (terminated) {
-   requestTermination();
-   }
-   else {
-   incrementIterationCounter();
+   // let the successors know that the end of this 
superstep data is reached
+   sendEndOfSuperstep();
+
+   if (isWorksetUpdate) {
+   // notify iteration head if responsible 
for workset update
+   
worksetBackChannel.notifyOfEndOfSuperstep();
+   }
+
+   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+   if (terminated) {
+   requestTermination();
+   this.driver.cleanup();
--- End diff --

`cleanup` will be called twice at the last step of the iteration, because 
it is also called in the `finally` block. This might potentially cause problems 
if one of the drivers assumes that `cleanup` will only be called once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG

2016-09-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-4590:
---

Assignee: Timo Walther

> Some Table API tests are failing when debug lvl is set to DEBUG
> ---
>
> Key: FLINK-4590
> URL: https://issues.apache.org/jira/browse/FLINK-4590
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Timo Walther
>
> For debugging another issue, I've set the log level on travis to DEBUG.
> After that, the Table API tests started failing
> {code}
> Failed tests: 
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
> {code}
> Probably Calcite is executing additional assertions depending on the debug 
> level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493806#comment-15493806
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2496#discussion_r79003843
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
 ---
@@ -82,43 +82,46 @@ protected void initialize() throws Exception {
public void run() throws Exception {

SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
+   try {
+   while (this.running && !terminationRequested()) {
 
-   while (this.running && !terminationRequested()) {
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
-   }
+   super.run();
 
-   super.run();
+   // check if termination was requested
+   verifyEndOfSuperstepState();
 
-   // check if termination was requested
-   verifyEndOfSuperstepState();
+   if (isWorksetUpdate && isWorksetIteration) {
+   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
+   
worksetAggregator.aggregate(numCollected);
+   }
 
-   if (isWorksetUpdate && isWorksetIteration) {
-   long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
-   worksetAggregator.aggregate(numCollected);
-   }
-   
-   if (log.isInfoEnabled()) {
-   log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
-   }
-   
-   // let the successors know that the end of this 
superstep data is reached
-   sendEndOfSuperstep();
-   
-   if (isWorksetUpdate) {
-   // notify iteration head if responsible for 
workset update
-   worksetBackChannel.notifyOfEndOfSuperstep();
-   }
-   
-   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+   if (log.isInfoEnabled()) {
+   log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
+   }
 
-   if (terminated) {
-   requestTermination();
-   }
-   else {
-   incrementIterationCounter();
+   // let the successors know that the end of this 
superstep data is reached
+   sendEndOfSuperstep();
+
+   if (isWorksetUpdate) {
+   // notify iteration head if responsible 
for workset update
+   
worksetBackChannel.notifyOfEndOfSuperstep();
+   }
+
+   boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+   if (terminated) {
+   requestTermination();
+   this.driver.cleanup();
--- End diff --

`cleanup` will be called twice at the last step of the iteration, because 
it is also called in the `finally` block. This might potentially cause problems 
if one of the drivers assumes that `cleanup` will only be called once.


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that 

[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL

2016-09-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2454
  
It seems that Calcite is planning to add additional clauses to the SQL 
statement. Like `SELECT STREAM x FROM y EMIT xyz`. If we really remove the 
keyword we have to make sure that Calcite's parser will also parse `EMIT` 
without the `STREAM` keyword.

@fhueske what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493770#comment-15493770
 ] 

ASF GitHub Bot commented on FLINK-4546:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2454
  
It seems that Calcite is planning to add additional clauses to the SQL 
statement. Like `SELECT STREAM x FROM y EMIT xyz`. If we really remove the 
keyword we have to make sure that Calcite's parser will also parse `EMIT` 
without the `STREAM` keyword.

@fhueske what do you think?


>  Remove STREAM keyword in Stream SQL
> 
>
> Key: FLINK-4546
> URL: https://issues.apache.org/jira/browse/FLINK-4546
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM 
> keyword in Stream SQL. 
> detailed discuss mailing list: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4623) Create Physical Execution Plan of a DataStream

2016-09-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4623:

Labels: starter  (was: )

> Create Physical Execution Plan of a DataStream
> --
>
> Key: FLINK-4623
> URL: https://issues.apache.org/jira/browse/FLINK-4623
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> The {{StreamTableEnvironment#explain(Table)}} command for tables of a 
> {{StreamTableEnvironment}} only outputs the abstract syntax tree. It would be 
> helpful if the {{explain}} method could also generate a string from the 
> {{DataStream}} containing a physical execution plan.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4623) Create Physical Execution Plan of a DataStream

2016-09-15 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4623:
---

 Summary: Create Physical Execution Plan of a DataStream
 Key: FLINK-4623
 URL: https://issues.apache.org/jira/browse/FLINK-4623
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


The {{StreamTableEnvironment#explain(Table)}} command for tables of a 
{{StreamTableEnvironment}} only outputs the abstract syntax tree. It would be 
helpful if the {{explain}} method could also generate a string from the 
{{DataStream}} containing a physical execution plan.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment

2016-09-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4599.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 545b72bee9b2297c9d1d2f5d59d6d839378fde92. I will create an issue 
regarding the Physical Stream Execution Plan.

> Add 'explain()' also to StreamTableEnvironment
> --
>
> Key: FLINK-4599
> URL: https://issues.apache.org/jira/browse/FLINK-4599
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Simone Robutti
>  Labels: starter
> Fix For: 1.2.0
>
>
> Currenlty, only the BatchTableEnvironment supports the {{explain}} command 
> for tables. We should also support it for the StreamTableEnvironment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2485


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493730#comment-15493730
 ] 

ASF GitHub Bot commented on FLINK-3322:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2495
  
The build shows failure but not sure what is the failure. It does not show 
any specific test case.


> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2495: FLINK-3322 - Make sorters to reuse the memory pages alloc...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2495
  
The build shows failure but not sure what is the failure. It does not show 
any specific test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2496: FLINK-4615 Reusing the memory allocated for the drivers a...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2496
  
Builds seems to pass with this latest commit. 
Any feedback here @ggevay and @StephanEwen .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493720#comment-15493720
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2496
  
Builds seems to pass with this latest commit. 
Any feedback here @ggevay and @StephanEwen .


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2485: [Flink 4599] - Add 'explain()' also to StreamTableEnviron...

2016-09-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2485
  
Thanks @chobeat. I fixed minor formatting issues and extended the 
documentation a little bit. Will merge...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Matthew Barlocker (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493667#comment-15493667
 ] 

Matthew Barlocker commented on FLINK-4617:
--

Sounds good. Thanks!

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2502: [FLINK-4550] [table] Clearly define SQL operator t...

2016-09-15 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2502

[FLINK-4550] [table] Clearly define SQL operator table

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR reduces the number of SQL operators to those that are fully 
supported. Since the SQL validator know which functions/operations are 
supported, this PR should also reduce the number of exceptions in code 
generation etc.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4550

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2502


commit 1790235551cd54a3b5f7c7b38c251f0af8d2ce76
Author: twalthr 
Date:   2016-09-15T14:56:16Z

[FLINK-4550] [table] Clearly define SQL operator table




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4550) Clearly define SQL operator table

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493575#comment-15493575
 ] 

ASF GitHub Bot commented on FLINK-4550:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2502

[FLINK-4550] [table] Clearly define SQL operator table

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR reduces the number of SQL operators to those that are fully 
supported. Since the SQL validator know which functions/operations are 
supported, this PR should also reduce the number of exceptions in code 
generation etc.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4550

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2502


commit 1790235551cd54a3b5f7c7b38c251f0af8d2ce76
Author: twalthr 
Date:   2016-09-15T14:56:16Z

[FLINK-4550] [table] Clearly define SQL operator table




> Clearly define SQL operator table
> -
>
> Key: FLINK-4550
> URL: https://issues.apache.org/jira/browse/FLINK-4550
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>  Labels: starter
>
> Currently, we use {{SqlStdOperatorTable.instance()}} for setting all 
> supported operations. However, not all of them are actually supported. 
> {{FunctionCatalog}} should only return those operators that are tested and 
> documented.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4622) CLI help message should include 'savepoint' action

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493533#comment-15493533
 ] 

ASF GitHub Bot commented on FLINK-4622:
---

GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/2501

[FLINK-4622] CLI help message should include 'savepoint' action

The Flink CLI help message should include the 'savepoint' action in the 
list of available actions. It currently looks like:

```shell
bash-4.3# flink foo
"foo" is not a valid action.

Valid actions are "run", "list", "info", "stop", or "cancel".

Specify the version option (-v or --version) to print Flink version.

Specify the help option (-h or --help) to get help on the command.
```

This pull-request adds the `savepoint` action to the list of valid actions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink flink-4622

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2501.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2501


commit a309fd4560ca5e08d05eb38fc377939aa87a5add
Author: Scott Kidder 
Date:   2016-09-15T14:46:29Z

[FLINK-4622] Include 'savepoint' in the CLI help message




> CLI help message should include 'savepoint' action
> --
>
> Key: FLINK-4622
> URL: https://issues.apache.org/jira/browse/FLINK-4622
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.1.2
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>Priority: Trivial
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The Flink CLI help message should include the 'savepoint' action in the list 
> of available actions. It currently looks like:
> {code}
> bash-4.3# flink foo
> "foo" is not a valid action.
> Valid actions are "run", "list", "info", "stop", or "cancel".
> Specify the version option (-v or --version) to print Flink version.
> Specify the help option (-h or --help) to get help on the command.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2501: [FLINK-4622] CLI help message should include 'save...

2016-09-15 Thread skidder
GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/2501

[FLINK-4622] CLI help message should include 'savepoint' action

The Flink CLI help message should include the 'savepoint' action in the 
list of available actions. It currently looks like:

```shell
bash-4.3# flink foo
"foo" is not a valid action.

Valid actions are "run", "list", "info", "stop", or "cancel".

Specify the version option (-v or --version) to print Flink version.

Specify the help option (-h or --help) to get help on the command.
```

This pull-request adds the `savepoint` action to the list of valid actions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink flink-4622

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2501.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2501


commit a309fd4560ca5e08d05eb38fc377939aa87a5add
Author: Scott Kidder 
Date:   2016-09-15T14:46:29Z

[FLINK-4622] Include 'savepoint' in the CLI help message




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4622) CLI help message should include 'savepoint' action

2016-09-15 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4622:
---

 Summary: CLI help message should include 'savepoint' action
 Key: FLINK-4622
 URL: https://issues.apache.org/jira/browse/FLINK-4622
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.1.2
Reporter: Scott Kidder
Assignee: Scott Kidder
Priority: Trivial


The Flink CLI help message should include the 'savepoint' action in the list of 
available actions. It currently looks like:

{code}
bash-4.3# flink foo
"foo" is not a valid action.

Valid actions are "run", "list", "info", "stop", or "cancel".

Specify the version option (-v or --version) to print Flink version.

Specify the help option (-h or --help) to get help on the command.
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493517#comment-15493517
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
> As for this pull request I suggest to update the false Javadoc from the 
streaming API and document in the concepts that the aggregation field accessors 
and the keyselectors are slightly different due to there semantics.

I've fixed the javadoc now in 154352ba879350bb1ab10b799456a67eb5819375.

As for the differences between the key selectors and aggregations (which 
use `FieldAccessor`): I tried to consolidate the supported field expressions 
between the two in 7c2dceeb87b06963cc2618312f5f1039387003ad. Now the 
aggregations should accept any field expression that selects a single field and 
would be a valid key selector, and some more (selecting array elements).

As for documenting that the aggregations don't work on a field that is 
still some composite type, I'm not sure if this is necessary, since this should 
be pretty obvious to the user.

> The batch API aggregations do not have the overloaded version where you 
could pass a field accessor string.

Here is the Jira for this: https://issues.apache.org/jira/browse/FLINK-4575


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...

2016-09-15 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2094
  
> As for this pull request I suggest to update the false Javadoc from the 
streaming API and document in the concepts that the aggregation field accessors 
and the keyselectors are slightly different due to there semantics.

I've fixed the javadoc now in 154352ba879350bb1ab10b799456a67eb5819375.

As for the differences between the key selectors and aggregations (which 
use `FieldAccessor`): I tried to consolidate the supported field expressions 
between the two in 7c2dceeb87b06963cc2618312f5f1039387003ad. Now the 
aggregations should accept any field expression that selects a single field and 
would be a valid key selector, and some more (selecting array elements).

As for documenting that the aggregations don't work on a field that is 
still some composite type, I'm not sure if this is necessary, since this should 
be pretty obvious to the user.

> The batch API aggregations do not have the overloaded version where you 
could pass a field accessor string.

Here is the Jira for this: https://issues.apache.org/jira/browse/FLINK-4575


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493507#comment-15493507
 ] 

ASF GitHub Bot commented on FLINK-4561:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2459
  
@shijinkui I understand your motivation, it is a good idea in general. 
Please do not feel discouraged to suggest improvements.

There are some things that look like they could be done better, but 
actually cannot be changed, because of some tricky bugs or limitations in other 
parts (like build plugins). That was the case here, so sometimes we cannot 
change some things.


> replace all the scala version as a `scala.binary.version` property
> --
>
> Key: FLINK-4561
> URL: https://issues.apache.org/jira/browse/FLINK-4561
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> replace all the scala version(2.10) as a property `scala.binary.version` 
> defined in root pom properties. default scala version property is 2.10.
> modify:
> 1. dependency include scala version 
> 2. module defining include scala version
> 3. scala version upgrade to 2.11.8 from 2.11.7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...

2016-09-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2459
  
@shijinkui I understand your motivation, it is a good idea in general. 
Please do not feel discouraged to suggest improvements.

There are some things that look like they could be done better, but 
actually cannot be changed, because of some tricky bugs or limitations in other 
parts (like build plugins). That was the case here, so sometimes we cannot 
change some things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3442) Expose savepoint button on web ui

2016-09-15 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493485#comment-15493485
 ] 

Scott Kidder commented on FLINK-3442:
-

This is duplicated by FLINK-4336

> Expose savepoint button on web ui
> -
>
> Key: FLINK-3442
> URL: https://issues.apache.org/jira/browse/FLINK-3442
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Webfrontend
>Reporter: Gyula Fora
>Priority: Minor
>
> Similarly to Cancel there should be a Savepoint button to initiate a 
> savepoint for streaming jobs.
> These 2 buttons should NOT be next to each other :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-09-15 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493427#comment-15493427
 ] 

Vasia Kalavri commented on FLINK-2254:
--

It sounds good to me [~ivan.mushketyk]. Please go ahead and break this into 
smaller tasks. We can then prioritize. Thanks!

> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4620) Automatically creating savepoints

2016-09-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493397#comment-15493397
 ] 

Stephan Ewen commented on FLINK-4620:
-

Adding [~uce] to this thread.

There is a FLIP to unify checkpoints and savepoints: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints

I think periodic savepoints are quite reasonable and fit well with the concept 
of unified checkpoints and savepoints.

> Automatically creating savepoints
> -
>
> Key: FLINK-4620
> URL: https://issues.apache.org/jira/browse/FLINK-4620
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2
>Reporter: Niels Basjes
>
> In the current versions of Flink you can run an external command and then a 
> savepoint is persisted in a durable location.
> Feature request: Make this a lot more automatic and easy to use.
> _Proposed workflow_
> # In my application I do something like this:
> {code}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new FsStateBackend("hdfs:///tmp/applicationState"));
> env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> env.enableAutomaticSavePoints(30);
> env.enableAutomaticSavePointCleaner(10);
> {code}
> # When I start the application for the first time the state backend is 
> 'empty'. 
> I expect the system to start in a clean state.
> After 10 minutes (30ms) a savepoint is created and stored.
> # When I stop and start the topology again it will automatically restore the 
> last available savepoint.
> Things to think about:
> * Note that this feature still means the manual version is useful!!
> * What to do on startup if the state is incompatible with the topology? Fail 
> the startup?
> * How many automatic savepoints to we keep? Only the last one?
> * Perhaps the API should allow multiple automatic savepoints at different 
> intervals in different locations.
> {code}
> // Make every 10 minutes and keep the last 10
> env.enableAutomaticSavePoints(30, new 
> FsStateBackend("hdfs:///tmp/applicationState"), 10);
> // Make every 24 hours and keep the last 30
> // Useful for being able to reproduce a problem a few days later
> env.enableAutomaticSavePoints(8640, new 
> FsStateBackend("hdfs:///tmp/applicationDailyStateSnapshot"), 30);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4550) Clearly define SQL operator table

2016-09-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-4550:
---

Assignee: Timo Walther

> Clearly define SQL operator table
> -
>
> Key: FLINK-4550
> URL: https://issues.apache.org/jira/browse/FLINK-4550
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>  Labels: starter
>
> Currently, we use {{SqlStdOperatorTable.instance()}} for setting all 
> supported operations. However, not all of them are actually supported. 
> {{FunctionCatalog}} should only return those operators that are tested and 
> documented.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4549) Test and document implicitly supported SQL functions

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493229#comment-15493229
 ] 

ASF GitHub Bot commented on FLINK-4549:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2500

[FLINK-4549] [table] Test and document implicitly supported SQL functions

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR tests and documents all SQL functions that are currently supported. 
Things that are not documented are not supposed to work.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink 
TableApiAllCalciteScalarFunctions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2500


commit 782fc7c09da25c44ecb61620acf2f7ce9d7077ad
Author: twalthr 
Date:   2016-09-02T09:00:09Z

[FLINK-4549] [table] Test and document implicitly supported SQL functions




> Test and document implicitly supported SQL functions
> 
>
> Key: FLINK-4549
> URL: https://issues.apache.org/jira/browse/FLINK-4549
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Calcite supports many SQL functions by translating them into {{RexNode}} s. 
> However, SQL functions like {{NULLIF}}, {{OVERLAPS}} are neither tested nor 
> document although supported.
> These functions should be tested and added to the documentation. We could 
> adopt parts from the Calcite documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2500: [FLINK-4549] [table] Test and document implicitly ...

2016-09-15 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2500

[FLINK-4549] [table] Test and document implicitly supported SQL functions

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR tests and documents all SQL functions that are currently supported. 
Things that are not documented are not supposed to work.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink 
TableApiAllCalciteScalarFunctions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2500


commit 782fc7c09da25c44ecb61620acf2f7ce9d7077ad
Author: twalthr 
Date:   2016-09-02T09:00:09Z

[FLINK-4549] [table] Test and document implicitly supported SQL functions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2499: [FLINK-4485] close and remove user class loader af...

2016-09-15 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2499

[FLINK-4485] close and remove user class loader after job completion

Keeping the user class loader around after job completion may lead to
excessive temp space usage because all user jars are kept until the
class loader is garbage collected. Tests showed that garbage collection
can be delayed for a long time after the class loader is not referenced
anymore. Note that for the class loader to not be referenced anymore,
its job has to be removed from the archive.

The fastest way to minimize temp space usage is to close and remove the
URLClassloader after job completion. This requires us to keep a
serializable copy of all data which needs the user class loader after
job completion, e.g. to display data on the web interface.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4485

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2499


commit 6ed17b9f5b9c13c80200ccf3db82bbfe727830bb
Author: Maximilian Michels 
Date:   2016-09-15T09:00:58Z

[FLINK-4485] close and remove user class loader after job completion

Keeping the user class loader around after job completion may lead to
excessive temp space usage because all user jars are kept until the
class loader is garbage collected. Tests showed that garbage collection
can be delayed for a long time after the class loader is not referenced
anymore. Note that for the class loader to not be referenced anymore,
its job has to be removed from the archive.

The fastest way to minimize temp space usage is to close and remove the
URLClassloader after job completion. This requires us to keep a
serializable copy of all data which needs the user class loader after
job completion, e.g. to display data on the web interface.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493019#comment-15493019
 ] 

ASF GitHub Bot commented on FLINK-4485:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2499

[FLINK-4485] close and remove user class loader after job completion

Keeping the user class loader around after job completion may lead to
excessive temp space usage because all user jars are kept until the
class loader is garbage collected. Tests showed that garbage collection
can be delayed for a long time after the class loader is not referenced
anymore. Note that for the class loader to not be referenced anymore,
its job has to be removed from the archive.

The fastest way to minimize temp space usage is to close and remove the
URLClassloader after job completion. This requires us to keep a
serializable copy of all data which needs the user class loader after
job completion, e.g. to display data on the web interface.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-4485

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2499


commit 6ed17b9f5b9c13c80200ccf3db82bbfe727830bb
Author: Maximilian Michels 
Date:   2016-09-15T09:00:58Z

[FLINK-4485] close and remove user class loader after job completion

Keeping the user class loader around after job completion may lead to
excessive temp space usage because all user jars are kept until the
class loader is garbage collected. Tests showed that garbage collection
can be delayed for a long time after the class loader is not referenced
anymore. Note that for the class loader to not be referenced anymore,
its job has to be removed from the archive.

The fastest way to minimize temp space usage is to close and remove the
URLClassloader after job completion. This requires us to keep a
serializable copy of all data which needs the user class loader after
job completion, e.g. to display data on the web interface.




> Finished jobs in yarn session fill /tmp filesystem
> --
>
> Key: FLINK-4485
> URL: https://issues.apache.org/jira/browse/FLINK-4485
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Niels Basjes
>Assignee: Maximilian Michels
>Priority: Blocker
>
> On a Yarn cluster I start a yarn-session with a few containers and task slots.
> Then I fire a 'large' number of Flink batch jobs in sequence against this 
> yarn session. It is the exact same job (java code) yet it gets different 
> parameters.
> In this scenario it is exporting HBase tables to files in HDFS and the 
> parameters are about which data from which tables and the name of the target 
> directory.
> After running several dozen jobs the jobs submission started to fail and we 
> investigated.
> We found that the cause was that on the Yarn node which was hosting the 
> jobmanager the /tmp file system was full (4GB was 100% full).
> How ever the output of {{du -hcs /tmp}} showed only 200MB in use.
> We found that a very large file (we guess it is the jar of the job) was put 
> in /tmp , used, deleted yet the file handle was not closed by the jobmanager.
> As soon as we killed the jobmanager the disk space was freed.
> The summary of the impact of this is that a yarn-session that receives enough 
> jobs brings down the Yarn node for all users.
> See parts of the output we got from {{lsof}} below.
> {code}
> COMMAND PID  USER   FD  TYPE DEVICE  SIZE   
> NODE NAME
> java  15034   nbasjes  550r  REG 253,17  66219695
> 245 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 
> (deleted)
> java  15034   nbasjes  551r  REG 253,17  66219695
> 252 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 
> (deleted)
> java  15034   nbasjes  552r  REG 253,17  66219695
> 267 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 
> (deleted)
> java  15034   nbasjes  553r  REG 253,17  66219695
> 250 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 
> (deleted)
> java  15034   nbasjes  554r  REG 253,17  66219695
> 288 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 
> (deleted)
> java  15034   

[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492928#comment-15492928
 ] 

ASF GitHub Bot commented on FLINK-3030:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2448
  
@StephanEwen Should I change anything else in this PR?


> Enhance Dashboard to show Execution Attempts
> 
>
> Key: FLINK-3030
> URL: https://issues.apache.org/jira/browse/FLINK-3030
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
> Fix For: 1.0.0
>
>
> Currently, the web dashboard shows only the latest execution attempt. We 
> should make all execution attempts and their accumulators available for 
> inspection.
> The REST monitoring API supports this, so it should be a change only to the 
> frontend part.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...

2016-09-15 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2448
  
@StephanEwen Should I change anything else in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4621) Improve decimal literals of SQL API

2016-09-15 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4621:
---

 Summary: Improve decimal literals of SQL API
 Key: FLINK-4621
 URL: https://issues.apache.org/jira/browse/FLINK-4621
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


Currently, all SQL {{DECIMAL}} types are converted to BigDecimals internally. 
By default, the SQL parsers creates {{DECIMAL}} literals of any number e.g. 
{{SELECT 1.0, 12, -0.5 FROM x}}. I think it would be better if these simple 
numbers would be represented as Java primitives instead of objects.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-1135) Blog post with topic "Accessing Data Stored in Hive with Flink"

2016-09-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther closed FLINK-1135.
---
Resolution: Won't Fix

> Blog post with topic "Accessing Data Stored in Hive with Flink"
> ---
>
> Key: FLINK-1135
> URL: https://issues.apache.org/jira/browse/FLINK-1135
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Timo Walther
>Assignee: Robert Metzger
>Priority: Minor
> Attachments: 2014-09-29-querying-hive.md
>
>
> Recently, I implemented a Flink job that accessed Hive. Maybe someone else is 
> going to try this. I created a blog post for the website to share my 
> experience.
> You'll find the blog post file attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1135) Blog post with topic "Accessing Data Stored in Hive with Flink"

2016-09-15 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492738#comment-15492738
 ] 

Timo Walther commented on FLINK-1135:
-

No this won't happen. I will close the issue.

> Blog post with topic "Accessing Data Stored in Hive with Flink"
> ---
>
> Key: FLINK-1135
> URL: https://issues.apache.org/jira/browse/FLINK-1135
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Timo Walther
>Assignee: Robert Metzger
>Priority: Minor
> Attachments: 2014-09-29-querying-hive.md
>
>
> Recently, I implemented a Flink job that accessed Hive. Maybe someone else is 
> going to try this. I created a blog post for the website to share my 
> experience.
> You'll find the blog post file attached.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem

2016-09-15 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492595#comment-15492595
 ] 

Maximilian Michels commented on FLINK-4485:
---

Yes, I was able to reproduce the problem with your test application yesterday. 
Checked out the relevant code and I think I'm on to something.

> Finished jobs in yarn session fill /tmp filesystem
> --
>
> Key: FLINK-4485
> URL: https://issues.apache.org/jira/browse/FLINK-4485
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Niels Basjes
>Assignee: Maximilian Michels
>Priority: Blocker
>
> On a Yarn cluster I start a yarn-session with a few containers and task slots.
> Then I fire a 'large' number of Flink batch jobs in sequence against this 
> yarn session. It is the exact same job (java code) yet it gets different 
> parameters.
> In this scenario it is exporting HBase tables to files in HDFS and the 
> parameters are about which data from which tables and the name of the target 
> directory.
> After running several dozen jobs the jobs submission started to fail and we 
> investigated.
> We found that the cause was that on the Yarn node which was hosting the 
> jobmanager the /tmp file system was full (4GB was 100% full).
> How ever the output of {{du -hcs /tmp}} showed only 200MB in use.
> We found that a very large file (we guess it is the jar of the job) was put 
> in /tmp , used, deleted yet the file handle was not closed by the jobmanager.
> As soon as we killed the jobmanager the disk space was freed.
> The summary of the impact of this is that a yarn-session that receives enough 
> jobs brings down the Yarn node for all users.
> See parts of the output we got from {{lsof}} below.
> {code}
> COMMAND PID  USER   FD  TYPE DEVICE  SIZE   
> NODE NAME
> java  15034   nbasjes  550r  REG 253,17  66219695
> 245 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 
> (deleted)
> java  15034   nbasjes  551r  REG 253,17  66219695
> 252 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 
> (deleted)
> java  15034   nbasjes  552r  REG 253,17  66219695
> 267 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 
> (deleted)
> java  15034   nbasjes  553r  REG 253,17  66219695
> 250 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 
> (deleted)
> java  15034   nbasjes  554r  REG 253,17  66219695
> 288 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 
> (deleted)
> java  15034   nbasjes  555r  REG 253,17  66219695
> 298 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 
> (deleted)
> java  15034   nbasjes  557r  REG 253,17  66219695
> 254 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 
> (deleted)
> java  15034   nbasjes  558r  REG 253,17  66219695
> 292 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 
> (deleted)
> java  15034   nbasjes  559r  REG 253,17  66219695
> 275 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 
> (deleted)
> java  15034   nbasjes  560r  REG 253,17  66219695
> 159 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 
> (deleted)
> java  15034   nbasjes  562r  REG 253,17  66219695
> 238 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 
> (deleted)
> java  15034   nbasjes  568r  REG 253,17  66219695
> 246 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 
> (deleted)
> java  15034   nbasjes  569r  REG 253,17  66219695
> 255 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 
> (deleted)
> java  15034   nbasjes  571r  REG 253,17  66219695
> 299 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 
> (deleted)
> java  15034   nbasjes  572r  REG 253,17  66219695
> 293 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 
> (deleted)
> java  15034   nbasjes  574r  REG 253,17  66219695
> 256 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 
> (deleted)
> java  15034   nbasjes  575r  REG 253,17  66219695
> 302 
> 

[jira] [Assigned] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem

2016-09-15 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-4485:
-

Assignee: Maximilian Michels

> Finished jobs in yarn session fill /tmp filesystem
> --
>
> Key: FLINK-4485
> URL: https://issues.apache.org/jira/browse/FLINK-4485
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Niels Basjes
>Assignee: Maximilian Michels
>Priority: Blocker
>
> On a Yarn cluster I start a yarn-session with a few containers and task slots.
> Then I fire a 'large' number of Flink batch jobs in sequence against this 
> yarn session. It is the exact same job (java code) yet it gets different 
> parameters.
> In this scenario it is exporting HBase tables to files in HDFS and the 
> parameters are about which data from which tables and the name of the target 
> directory.
> After running several dozen jobs the jobs submission started to fail and we 
> investigated.
> We found that the cause was that on the Yarn node which was hosting the 
> jobmanager the /tmp file system was full (4GB was 100% full).
> How ever the output of {{du -hcs /tmp}} showed only 200MB in use.
> We found that a very large file (we guess it is the jar of the job) was put 
> in /tmp , used, deleted yet the file handle was not closed by the jobmanager.
> As soon as we killed the jobmanager the disk space was freed.
> The summary of the impact of this is that a yarn-session that receives enough 
> jobs brings down the Yarn node for all users.
> See parts of the output we got from {{lsof}} below.
> {code}
> COMMAND PID  USER   FD  TYPE DEVICE  SIZE   
> NODE NAME
> java  15034   nbasjes  550r  REG 253,17  66219695
> 245 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 
> (deleted)
> java  15034   nbasjes  551r  REG 253,17  66219695
> 252 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 
> (deleted)
> java  15034   nbasjes  552r  REG 253,17  66219695
> 267 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 
> (deleted)
> java  15034   nbasjes  553r  REG 253,17  66219695
> 250 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 
> (deleted)
> java  15034   nbasjes  554r  REG 253,17  66219695
> 288 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 
> (deleted)
> java  15034   nbasjes  555r  REG 253,17  66219695
> 298 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 
> (deleted)
> java  15034   nbasjes  557r  REG 253,17  66219695
> 254 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 
> (deleted)
> java  15034   nbasjes  558r  REG 253,17  66219695
> 292 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 
> (deleted)
> java  15034   nbasjes  559r  REG 253,17  66219695
> 275 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 
> (deleted)
> java  15034   nbasjes  560r  REG 253,17  66219695
> 159 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 
> (deleted)
> java  15034   nbasjes  562r  REG 253,17  66219695
> 238 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 
> (deleted)
> java  15034   nbasjes  568r  REG 253,17  66219695
> 246 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 
> (deleted)
> java  15034   nbasjes  569r  REG 253,17  66219695
> 255 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 
> (deleted)
> java  15034   nbasjes  571r  REG 253,17  66219695
> 299 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 
> (deleted)
> java  15034   nbasjes  572r  REG 253,17  66219695
> 293 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 
> (deleted)
> java  15034   nbasjes  574r  REG 253,17  66219695
> 256 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 
> (deleted)
> java  15034   nbasjes  575r  REG 253,17  66219695
> 302 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0029 
> (deleted)
> java  15034   nbasjes  576r  REG 253,17  66219695 

[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492554#comment-15492554
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Thanks @HungUnicorn, thats useful info. I wonder though if this config 
should be set by the user, instead of letting the connector internally set this.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

2016-09-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Thanks @HungUnicorn, thats useful info. I wonder though if this config 
should be set by the user, instead of letting the connector internally set this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-4617.
--

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-4617.

Resolution: Not A Bug

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492513#comment-15492513
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4617:


Hi [~mbarlocker], now that this is sorted out (on SO), I'm closing this ticket 
now ;)

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators

2016-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492504#comment-15492504
 ] 

ASF GitHub Bot commented on FLINK-4615:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2496
  
Updated PR. Handles reset()in a much better way. PageRankITCase exposed 
some cases. Also does the clean up of the driver now.
Still wondering there should be a reason why the driver was instantiated 
every time for an iterative task. May be am missing the reason for that. If 
that is for sure needed then this whole fix may not work. But the patch as such 
is better in terms of handling the reset() part. 


> Reusing the memory allocated for the drivers and iterators
> --
>
> Key: FLINK-4615
> URL: https://issues.apache.org/jira/browse/FLINK-4615
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better 
> closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2496: FLINK-4615 Reusing the memory allocated for the drivers a...

2016-09-15 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2496
  
Updated PR. Handles reset()in a much better way. PageRankITCase exposed 
some cases. Also does the clean up of the driver now.
Still wondering there should be a reason why the driver was instantiated 
every time for an iterative task. May be am missing the reason for that. If 
that is for sure needed then this whole fix may not work. But the patch as such 
is better in terms of handling the reset() part. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >