[jira] [Updated] (FLINK-4627) Use Flink's PropertiesUtil in Kinesis connector to extract typed values from config properties
[ https://issues.apache.org/jira/browse/FLINK-4627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4627: --- Description: Right now value extraction from config properties in the Kinesis connector is using the plain methods from {{java.util.Properties}} with string parsing. We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines of and more readable code. was: Right now value extraction from config properties in the Kinesis connector is using the plain methods from {{java.util.Properties}} with type casting. We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines of and more readable code. > Use Flink's PropertiesUtil in Kinesis connector to extract typed values from > config properties > --- > > Key: FLINK-4627 > URL: https://issues.apache.org/jira/browse/FLINK-4627 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Trivial > Fix For: 1.2.0 > > > Right now value extraction from config properties in the Kinesis connector is > using the plain methods from {{java.util.Properties}} with string parsing. > We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines > of and more readable code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4627) Use Flink's PropertiesUtil in Kinesis connector to extract typed values from config properties
Tzu-Li (Gordon) Tai created FLINK-4627: -- Summary: Use Flink's PropertiesUtil in Kinesis connector to extract typed values from config properties Key: FLINK-4627 URL: https://issues.apache.org/jira/browse/FLINK-4627 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Tzu-Li (Gordon) Tai Priority: Trivial Fix For: 1.2.0 Right now value extraction from config properties in the Kinesis connector is using the plain methods from {{java.util.Properties}} with type casting. We can use Flink's utility {{PropertiesUtil}} to do this, with lesser lines of and more readable code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property
[ https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15495068#comment-15495068 ] ASF GitHub Bot commented on FLINK-4561: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2459 @StephanEwen Thank for your reply, understand it. > replace all the scala version as a `scala.binary.version` property > -- > > Key: FLINK-4561 > URL: https://issues.apache.org/jira/browse/FLINK-4561 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > > replace all the scala version(2.10) as a property `scala.binary.version` > defined in root pom properties. default scala version property is 2.10. > modify: > 1. dependency include scala version > 2. module defining include scala version > 3. scala version upgrade to 2.11.8 from 2.11.7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2459 @StephanEwen Thank for your reply, understand it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4626) Missing break in MetricStore#add()
Ted Yu created FLINK-4626: - Summary: Missing break in MetricStore#add() Key: FLINK-4626 URL: https://issues.apache.org/jira/browse/FLINK-4626 Project: Flink Issue Type: Bug Reporter: Ted Yu {code} switch (info.getCategory()) { case INFO_CATEGORY_JM: addMetric(jobManager.metrics, name, metric); case INFO_CATEGORY_TM: String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; {code} Looks like a break is missing following addMetric(jobManager.metrics) call -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79040795 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- For example, the various combine drivers seem to be easy to make resettable. Maybe this PR should just do that, so we maximize the chances that we can get the PR to be merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4515) org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible with 1.1 released version
[ https://issues.apache.org/jira/browse/FLINK-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-4515: - Component/s: JobManager > org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible > with 1.1 released version > --- > > Key: FLINK-4515 > URL: https://issues.apache.org/jira/browse/FLINK-4515 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Nagarjun Guraja > > Current org.apache.flink.runtime.jobmanager.JobInfo in the 1.2 trunk is not > backwards compatible which breaks job recorvery while upgrading to latest > flink build from 1.1 release > 2016-08-26 13:39:56,618 INFO org.apache.flink.runtime.jobmanager.JobManager > - Attempting to recover all jobs. > 2016-08-26 13:39:57,225 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Fatal error: Failed to recover jobs. > java.io.InvalidClassException: org.apache.flink.runtime.jobmanager.JobInfo; > local class incompatible: stream classdesc serialVersionUID = > 4102282956967236682, local class serialVersionUID = -2377916285980374169 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:543) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:539) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494273#comment-15494273 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79040795 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- For example, the various combine drivers seem to be easy to make resettable. Maybe this PR should just do that, so we maximize the chances that we can get the PR to be merged. > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4515) org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible with 1.1 released version
[ https://issues.apache.org/jira/browse/FLINK-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-4515: - Affects Version/s: 1.2.0 > org.apache.flink.runtime.jobmanager.JobInfo class is not backwards compatible > with 1.1 released version > --- > > Key: FLINK-4515 > URL: https://issues.apache.org/jira/browse/FLINK-4515 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0 >Reporter: Nagarjun Guraja > > Current org.apache.flink.runtime.jobmanager.JobInfo in the 1.2 trunk is not > backwards compatible which breaks job recorvery while upgrading to latest > flink build from 1.1 release > 2016-08-26 13:39:56,618 INFO org.apache.flink.runtime.jobmanager.JobManager > - Attempting to recover all jobs. > 2016-08-26 13:39:57,225 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Fatal error: Failed to recover jobs. > java.io.InvalidClassException: org.apache.flink.runtime.jobmanager.JobInfo; > local class incompatible: stream classdesc serialVersionUID = > 4102282956967236682, local class serialVersionUID = -2377916285980374169 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:543) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:539) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:539) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:535) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494252#comment-15494252 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79038567 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- Currently, the `ResettableDriver` interface extends the `Driver` interface. I would say that for the moment, let's just start by making some of the drivers resettable in this PR, and keep both interfaces around. And then when this PR is merged, a second PR could make all the remaing drivers resettable as well. And then a third one could merge the functionality of `ResettableDriver` into `Driver`, and thus get rid of `ResettableDriver`. > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79038567 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- Currently, the `ResettableDriver` interface extends the `Driver` interface. I would say that for the moment, let's just start by making some of the drivers resettable in this PR, and keep both interfaces around. And then when this PR is merged, a second PR could make all the remaing drivers resettable as well. And then a third one could merge the functionality of `ResettableDriver` into `Driver`, and thus get rid of `ResettableDriver`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4625) Guard Flink processes against blocking shutdown hooks
[ https://issues.apache.org/jira/browse/FLINK-4625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494110#comment-15494110 ] ASF GitHub Bot commented on FLINK-4625: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2503 [FLINK-4625] [core] Add a safety net to forcibly terminate JVM if clean shutdown freezed. Resource managers like YARN send the JVM the `SIGTERM` signal to kill the process. With `SIGTERM`, the JVM shutdown hooks run, and may cause the process to freeze up during shutdown. Especially since all dependencies (like Hadoop) may install shutdown hooks (and do so), it is not in Flink's control to make sure all shutdown hooks are well behaved and never lock the JVM shutdown. This pull requests adds a shutdown hook that calls `Runtime.halt()` after a delay. This forcibly terminates the JVM if clean shutdown does not succeed within a certain time (default is five seconds). The pull request also adds tests that validate the behavior of JVM shutdown lockups and that the safety net ensures the process really shuts down. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink process_self_kill Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2503.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2503 commit d5b9860773ec7aaf0b238544b794a10012d8dda5 Author: Stephan EwenDate: 2016-09-15T17:27:06Z [FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean shutdown freezed. > Guard Flink processes against blocking shutdown hooks > - > > Key: FLINK-4625 > URL: https://issues.apache.org/jira/browse/FLINK-4625 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > Resource managers like YARN send the JVM the {{SIGTERM}} signal to kill the > process, if it wants to terminate a process. > With {{SIGTERM}}, the JVM shutdown hooks run, and may cause the process to > freeze up on shutdown. Especially since all dependencies (like Hadoop) may > install shutdown hooks (and do so), it is not in Flink's control to make sure > all Shutdown hooks are well behaved. > I propose to add a guard that forcibly terminates the JVM if clean shutdown > does not succeed within a certain time (say five seconds). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2503: [FLINK-4625] [core] Add a safety net to forcibly t...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2503 [FLINK-4625] [core] Add a safety net to forcibly terminate JVM if clean shutdown freezed. Resource managers like YARN send the JVM the `SIGTERM` signal to kill the process. With `SIGTERM`, the JVM shutdown hooks run, and may cause the process to freeze up during shutdown. Especially since all dependencies (like Hadoop) may install shutdown hooks (and do so), it is not in Flink's control to make sure all shutdown hooks are well behaved and never lock the JVM shutdown. This pull requests adds a shutdown hook that calls `Runtime.halt()` after a delay. This forcibly terminates the JVM if clean shutdown does not succeed within a certain time (default is five seconds). The pull request also adds tests that validate the behavior of JVM shutdown lockups and that the safety net ensures the process really shuts down. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink process_self_kill Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2503.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2503 commit d5b9860773ec7aaf0b238544b794a10012d8dda5 Author: Stephan EwenDate: 2016-09-15T17:27:06Z [FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean shutdown freezed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2498: [FLINK-4619] - JobManager does not answer to client when ...
Github user mproch commented on the issue: https://github.com/apache/flink/pull/2498 Yes, it's a bit more tricky then I expected... These tests probably also need to be changed, because they relied on previous behaviour... What's more with the change some checks on FlinkMiniCluster start to be non-deterministic... I'll commit fixes when I'll fully understand how it works... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4619) JobManager does not answer to client when restore from savepoint fails
[ https://issues.apache.org/jira/browse/FLINK-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494099#comment-15494099 ] ASF GitHub Bot commented on FLINK-4619: --- Github user mproch commented on the issue: https://github.com/apache/flink/pull/2498 Yes, it's a bit more tricky then I expected... These tests probably also need to be changed, because they relied on previous behaviour... What's more with the change some checks on FlinkMiniCluster start to be non-deterministic... I'll commit fixes when I'll fully understand how it works... > JobManager does not answer to client when restore from savepoint fails > -- > > Key: FLINK-4619 > URL: https://issues.apache.org/jira/browse/FLINK-4619 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: Maciej Prochniak > Fix For: 1.2.0, 1.1.3 > > > When savepoint used is incompatible with currently deployed process, the job > manager never returns (jobInfo.notifyClients is not invoked in one of > try-catch blocks) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4619) JobManager does not answer to client when restore from savepoint fails
[ https://issues.apache.org/jira/browse/FLINK-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494086#comment-15494086 ] ASF GitHub Bot commented on FLINK-4619: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2498 Good idea. Unfortunately, the changed broke some tests: - `SavepointITCase.testSubmitWithUnknownSavepointPath` - `RescalingITCase.testSavepointRescalingFailureWithNonPartitionedState` You can see more in the Travis CI report. > JobManager does not answer to client when restore from savepoint fails > -- > > Key: FLINK-4619 > URL: https://issues.apache.org/jira/browse/FLINK-4619 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: Maciej Prochniak > Fix For: 1.2.0, 1.1.3 > > > When savepoint used is incompatible with currently deployed process, the job > manager never returns (jobInfo.notifyClients is not invoked in one of > try-catch blocks) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2498: [FLINK-4619] - JobManager does not answer to client when ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2498 Good idea. Unfortunately, the changed broke some tests: - `SavepointITCase.testSubmitWithUnknownSavepointPath` - `RescalingITCase.testSavepointRescalingFailureWithNonPartitionedState` You can see more in the Travis CI report. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494054#comment-15494054 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79021152 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- So can we only have ResettableDriver only and no need for Driver interface then? > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79021152 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- So can we only have ResettableDriver only and no need for Driver interface then? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem
[ https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494047#comment-15494047 ] ASF GitHub Bot commented on FLINK-4485: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2499 Looks food to me. +1 to merge > Finished jobs in yarn session fill /tmp filesystem > -- > > Key: FLINK-4485 > URL: https://issues.apache.org/jira/browse/FLINK-4485 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Niels Basjes >Assignee: Maximilian Michels >Priority: Blocker > > On a Yarn cluster I start a yarn-session with a few containers and task slots. > Then I fire a 'large' number of Flink batch jobs in sequence against this > yarn session. It is the exact same job (java code) yet it gets different > parameters. > In this scenario it is exporting HBase tables to files in HDFS and the > parameters are about which data from which tables and the name of the target > directory. > After running several dozen jobs the jobs submission started to fail and we > investigated. > We found that the cause was that on the Yarn node which was hosting the > jobmanager the /tmp file system was full (4GB was 100% full). > How ever the output of {{du -hcs /tmp}} showed only 200MB in use. > We found that a very large file (we guess it is the jar of the job) was put > in /tmp , used, deleted yet the file handle was not closed by the jobmanager. > As soon as we killed the jobmanager the disk space was freed. > The summary of the impact of this is that a yarn-session that receives enough > jobs brings down the Yarn node for all users. > See parts of the output we got from {{lsof}} below. > {code} > COMMAND PID USER FD TYPE DEVICE SIZE > NODE NAME > java 15034 nbasjes 550r REG 253,17 66219695 > 245 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 > (deleted) > java 15034 nbasjes 551r REG 253,17 66219695 > 252 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 > (deleted) > java 15034 nbasjes 552r REG 253,17 66219695 > 267 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 > (deleted) > java 15034 nbasjes 553r REG 253,17 66219695 > 250 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 > (deleted) > java 15034 nbasjes 554r REG 253,17 66219695 > 288 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 > (deleted) > java 15034 nbasjes 555r REG 253,17 66219695 > 298 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 > (deleted) > java 15034 nbasjes 557r REG 253,17 66219695 > 254 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 > (deleted) > java 15034 nbasjes 558r REG 253,17 66219695 > 292 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 > (deleted) > java 15034 nbasjes 559r REG 253,17 66219695 > 275 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 > (deleted) > java 15034 nbasjes 560r REG 253,17 66219695 > 159 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 > (deleted) > java 15034 nbasjes 562r REG 253,17 66219695 > 238 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 > (deleted) > java 15034 nbasjes 568r REG 253,17 66219695 > 246 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 > (deleted) > java 15034 nbasjes 569r REG 253,17 66219695 > 255 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 > (deleted) > java 15034 nbasjes 571r REG 253,17 66219695 > 299 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 > (deleted) > java 15034 nbasjes 572r REG 253,17 66219695 > 293 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 > (deleted) > java 15034 nbasjes 574r REG 253,17 66219695 > 256 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 > (deleted) > java 15034 nbasjes 575r REG 253,17 66219695 > 302 >
[GitHub] flink issue #2499: [FLINK-4485] close and remove user class loader after job...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2499 Looks food to me. +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2501: [FLINK-4622] CLI help message should include 'savepoint' ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2501 Thanks for the correction. +1 to merge this! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4622) CLI help message should include 'savepoint' action
[ https://issues.apache.org/jira/browse/FLINK-4622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494044#comment-15494044 ] ASF GitHub Bot commented on FLINK-4622: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2501 Thanks for the correction. +1 to merge this! > CLI help message should include 'savepoint' action > -- > > Key: FLINK-4622 > URL: https://issues.apache.org/jira/browse/FLINK-4622 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.2 >Reporter: Scott Kidder >Assignee: Scott Kidder >Priority: Trivial > Original Estimate: 1h > Remaining Estimate: 1h > > The Flink CLI help message should include the 'savepoint' action in the list > of available actions. It currently looks like: > {code} > bash-4.3# flink foo > "foo" is not a valid action. > Valid actions are "run", "list", "info", "stop", or "cancel". > Specify the version option (-v or --version) to print Flink version. > Specify the help option (-h or --help) to get help on the command. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4625) Guard Flink processes against blocking shutdown hooks
Stephan Ewen created FLINK-4625: --- Summary: Guard Flink processes against blocking shutdown hooks Key: FLINK-4625 URL: https://issues.apache.org/jira/browse/FLINK-4625 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.1.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.2.0 Resource managers like YARN send the JVM the {{SIGTERM}} signal to kill the process, if it wants to terminate a process. With {{SIGTERM}}, the JVM shutdown hooks run, and may cause the process to freeze up on shutdown. Especially since all dependencies (like Hadoop) may install shutdown hooks (and do so), it is not in Flink's control to make sure all Shutdown hooks are well behaved. I propose to add a guard that forcibly terminates the JVM if clean shutdown does not succeed within a certain time (say five seconds). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494022#comment-15494022 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79017956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- Ok. Let me update the code accordingly. Of course it is simple then. My only concern was I was not sure whether every driver can be made of type ResettableDriver when it was a special case for certain drivers. > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79017956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- Ok. Let me update the code accordingly. Of course it is simple then. My only concern was I was not sure whether every driver can be made of type ResettableDriver when it was a special case for certain drivers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-4389. --- Resolution: Fixed Fix Version/s: 1.2.0 Implemented in 70704de0c82cbb7b143dd696221e11999feb3600 > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.2.0, pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494002#comment-15494002 ] ASF GitHub Bot commented on FLINK-4389: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2363 > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache, 1.2.0 > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2363 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4589) Fix Merging of Covering Window in MergingWindowSet
[ https://issues.apache.org/jira/browse/FLINK-4589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493989#comment-15493989 ] ASF GitHub Bot commented on FLINK-4589: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2476 Looks good to me, merging this... > Fix Merging of Covering Window in MergingWindowSet > -- > > Key: FLINK-4589 > URL: https://issues.apache.org/jira/browse/FLINK-4589 > Project: Flink > Issue Type: Bug > Components: Windowing Operators >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.0.4, 1.2.0, 1.1.3 > > > Right now, when a new window gets merged that covers all of the existing > window {{MergingWindowSet}} does not correctly set the state window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2476: [FLINK-4589] Fix Merging of Covering Window in MergingWin...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2476 Looks good to me, merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493956#comment-15493956 ] Matthew Barlocker commented on FLINK-4618: -- Nice find [~melmoth] and [~tzulitai] - I don't know the code well enough to feel confident making the change or testing the fix. Please feel free. I started learning Flink about a week ago, and have put a solid 3 hours into it. > FlinkKafkaConsumer09 should start from the next record on startup from > offsets in Kafka > --- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > Fix For: 1.2.0, 1.1.3 > > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO
[jira] [Comment Edited] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493927#comment-15493927 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4618 at 9/15/16 5:04 PM: - I've confirmed that the problem only exists in the 0.9 consumer. [~melmoth] and [~mbarlocker], since you were the original reporters of the bug, would any one of you want to work on fixing it and open a PR? You can ping me to help review when you're ready :) Otherwise I can also pick it up. In any case, let me know ;) was (Author: tzulitai): I've confirmed that the problem only exists in the 0.9 consumer. [~melmoth] and [~mbarlocker], since you were the original reporters of the bug, would you want to work on fixing it and open a PR? You can ping me to help review when you're ready :) Otherwise I can also pick it up. In any case, let me know ;) > FlinkKafkaConsumer09 should start from the next record on startup from > offsets in Kafka > --- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > Fix For: 1.2.0, 1.1.3 > > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646,
[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart
[ https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493950#comment-15493950 ] Tzu-Li (Gordon) Tai commented on FLINK-4617: Revisited and reconfirmed that this is actually a bug in the 0.9 consumer (sorry for mis-concluding in the first place). Please see FLINK-4618. > Kafka & Flink duplicate messages on restart > --- > > Key: FLINK-4617 > URL: https://issues.apache.org/jira/browse/FLINK-4617 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2 > Environment: Ubuntu 16.04 > Flink 1.1.* > Kafka 0.9.0.1 > Scala 2.11.7 > Java 1.8.0_91 >Reporter: Matthew Barlocker >Priority: Critical > > [StackOverflow > Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart] > Flink (the kafka connector) re-runs the last 3-9 messages it saw before it > was shut down. > *My code:* > {code} > import java.util.Properties > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.CheckpointingMode > import org.apache.flink.streaming.connectors.kafka._ > import org.apache.flink.streaming.util.serialization._ > import org.apache.flink.runtime.state.filesystem._ > object Runner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(500) > env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")) > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val properties = new Properties() > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "testing"); > val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new > SimpleStringSchema(), properties) > val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", > "testing-out", new SimpleStringSchema()) > env.addSource(kafkaConsumer) > .addSink(kafkaProducer) > env.execute() > } > } > {code} > *My sbt dependencies:* > {code} > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % "1.1.2", > "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", > "org.apache.flink" %% "flink-clients" % "1.1.2", > "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", > "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" > ) > {code} > *My process:* > using 3 terminals: > {code} > TERM-1 start sbt, run program > TERM-2 create kafka topics testing-in and testing-out > TERM-2 run kafka-console-producer on testing-in topic > TERM-3 run kafka-console-consumer on testing-out topic > TERM-2 send data to kafka producer. > Wait for a couple seconds (buffers need to flush) > TERM-3 watch data appear in testing-out topic > Wait for at least 500 milliseconds for checkpointing to happen > TERM-1 stop sbt > TERM-1 run sbt > TERM-3 watch last few lines of data appear in testing-out topic > {code} > *My expectations:* > When there are no errors in the system, I expect to be able to turn flink on > and off without reprocessing messages that successfully completed the stream > in a prior run. > *My attempts to fix:* > I've added the call to setStateBackend, thinking that perhaps the default > memory backend just didn't remember correctly. That didn't seem to help. > I've removed the call to enableCheckpointing, hoping that perhaps there was a > separate mechanism to track state in Flink vs Zookeeper. That didn't seem to > help. > I've used different sinks, RollingFileSink, print(); hoping that maybe the > bug was in kafka. That didn't seem to help. > I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that > maybe the bug was in the latest version. That didn't seem to help. > I've added the zookeeper.connect config to the properties object, hoping that > the comment about it only being useful in 0.8 was wrong. That didn't seem to > help. > I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea > drfloob). That didn't seem to help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4624) Gelly's summarization algorithm cannot deal with null vertex group values
Till Rohrmann created FLINK-4624: Summary: Gelly's summarization algorithm cannot deal with null vertex group values Key: FLINK-4624 URL: https://issues.apache.org/jira/browse/FLINK-4624 Project: Flink Issue Type: Bug Components: Gelly Reporter: Till Rohrmann Fix For: 1.2.0 Gelly's {{Summarization}} algorithm cannot handle null values in the `VertexGroupItem.f2`. This behaviour is hidden by using Strings as a vertex value in the {{SummarizationITCase}}, because the {{StringSerializer}} can handle null values. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4618: --- Fix Version/s: 1.1.3 1.2.0 > FlinkKafkaConsumer09 should start from the next record on startup from > offsets in Kafka > --- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > Fix For: 1.2.0, 1.1.3 > > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer >- Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job > 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating
[jira] [Commented] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493927#comment-15493927 ] Tzu-Li (Gordon) Tai commented on FLINK-4618: I've confirmed that the problem only exists in the 0.9 consumer. [~melmoth] and [~mbarlocker], since you were the original reporters of the bug, would you want to work on fixing it and open a PR? You can ping me to help review when you're ready :) Otherwise I can also pick it up. In any case, let me know ;) > FlinkKafkaConsumer09 should start from the next record on startup from > offsets in Kafka > --- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition
[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record on startup from Kafka offsets
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4618: --- Summary: Kafka Consumer 0.9 should start from the next record on startup from Kafka offsets (was: Kafka Consumer 0.9 should start from the next record when startup from Kafka offsets) > Kafka Consumer 0.9 should start from the next record on startup from Kafka > offsets > -- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer >- Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job >
[jira] [Updated] (FLINK-4618) FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4618: --- Summary: FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka (was: Kafka Consumer 0.9 should start from the next record on startup from offsets in Kafka) > FlinkKafkaConsumer09 should start from the next record on startup from > offsets in Kafka > --- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer >- Stopped BLOB server at 0.0.0.0:2946 > --
[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record on startup from offsets in Kafka
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4618: --- Summary: Kafka Consumer 0.9 should start from the next record on startup from offsets in Kafka (was: Kafka Consumer 0.9 should start from the next record on startup from Kafka offsets) > Kafka Consumer 0.9 should start from the next record on startup from offsets > in Kafka > - > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > > **Original reported ticket title: Last kafka message gets consumed twice when > restarting job** > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer >- Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job
[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record when startup from Kafka offsets
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4618: --- Description: **Original reported ticket title: Last kafka message gets consumed twice when restarting job** There seem to be an issue with the offset management in Flink. When a job is stopped and startet again, a message from the previous offset is read again. I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new consumer group and emitted one record. You can cleary see, that the consumer waits for a new record at offset 4848911, which is correct. After restarting, it consumes a record at 4848910, causing the record to be consumed more than once. I checked the offset with the Kafka CMD tools, the commited offset in zookeeper is 4848910. Here is my log output: {code} 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147482646 at hdp1:6667. 10:29:24,225 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching committed offsets for partitions: [myTopic-0] 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147482646 10:29:24,234 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No committed offset for partition myTopic-0 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition myTopic-0 to latest offset. 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 4848910 for partition myTopic-0 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 -- Inserting a new event here 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Adding fetched record for partition myTopic-0 with offset 4848910 to buffered record list 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Returning fetched records at offset 4848910 for assigned partition myTopic-0 and update position to 4848911 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 @ 1473841823887 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 6 (in 96 ms) 10:30:24,196 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, metadata=''}} to Node(2147482646, hdp1, 6667) 10:30:24,204 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed offset 4848910 for partition myTopic-0 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848911 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:2946 -- Restarting job 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147482646 at hdp1:6667. 10:32:01,673 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching committed offsets for partitions: [myTopic-0] 10:32:01,677 DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147482646 // See below! Shouldn't the offset be 4848911? 10:32:01,682 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition myTopic-0 to the committed offset 4848910 10:32:01,683 TRACE org.apache.kafka.clients.consumer.internals.Fetcher - Added fetch request for partition myTopic-0 at offset 4848910 10:32:01,685 DEBUG org.apache.kafka.clients.NetworkClient
[jira] [Updated] (FLINK-4618) Kafka Consumer 0.9 should start from the next record when startup from Kafka offsets
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4618: --- Summary: Kafka Consumer 0.9 should start from the next record when startup from Kafka offsets (was: Last kafka message gets consumed twice when restarting job) > Kafka Consumer 0.9 should start from the next record when startup from Kafka > offsets > > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer >- Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job > 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493897#comment-15493897 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79010283 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- > You mean all drivers will now become a type of ResettableDriver? Yes, this was the idea in my comment [1] on the Jira. I'm sorry if I wasn't clear enough there. The advantage of this is that you wouldn't need to add more logic in `BatchTask`, `AbstractIterativeTask` and its descendants, because these already handle the situation when the driver is an instance of `ResettableDriver`. (See e.g. the beginning of `reinstantiateDriver`.) [1] https://issues.apache.org/jira/browse/FLINK-3322?focusedCommentId=15474531=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15474531 > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-4618) Last kafka message gets consumed twice when restarting job
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reopened FLINK-4618: > Last kafka message gets consumed twice when restarting job > -- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:24,963 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:48,057 INFO org.apache.flink.runtime.blob.BlobServer >- Stopped BLOB server at 0.0.0.0:2946 > -- Restarting job > 10:32:01,672 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:32:01,673 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:32:01,677 DEBUG
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79010283 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- > You mean all drivers will now become a type of ResettableDriver? Yes, this was the idea in my comment [1] on the Jira. I'm sorry if I wasn't clear enough there. The advantage of this is that you wouldn't need to add more logic in `BatchTask`, `AbstractIterativeTask` and its descendants, because these already handle the situation when the driver is an instance of `ResettableDriver`. (See e.g. the beginning of `reinstantiateDriver`.) [1] https://issues.apache.org/jira/browse/FLINK-3322?focusedCommentId=15474531=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15474531 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493858#comment-15493858 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79007600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- I saw that code. But am not sure how to do that. Driver is actually instantiated from the config. So how to ensure that it is of type ResettableDriver? You mean all drivers will now become a type of ResettableDriver? > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2495: FLINK-3322 - Make sorters to reuse the memory pages alloc...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2495 @ggevay and @StephanEwen - any feedback/reviews here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493859#comment-15493859 ] ASF GitHub Bot commented on FLINK-3322: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2495 @ggevay and @StephanEwen - any feedback/reviews here. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79007600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- I saw that code. But am not sure how to do that. Driver is actually instantiated from the config. So how to ensure that it is of type ResettableDriver? You mean all drivers will now become a type of ResettableDriver? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4618) Last kafka message gets consumed twice when restarting job
[ https://issues.apache.org/jira/browse/FLINK-4618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493845#comment-15493845 ] Tzu-Li (Gordon) Tai commented on FLINK-4618: Hi [~melmoth], I've revisited this issue, and I think this may actually be a bug. Like what you mentioned in SO, the Kafka consumer is either committing 1 less offset back to ZK, or that it's supposed to start from the next record when starting from offsets found in ZK. Note that this still has nothing to do with Flink's exactly-once guarantee though; this is simply that the consumer isn't starting at the right place when starting with offsets found from ZK. The savepoint / exactly-once descriptions I described before still holds. I'm sorry for making a wrong conclusion on this bug in the first place. I'll update information in this ticket once I confirm the bug! > Last kafka message gets consumed twice when restarting job > -- > > Key: FLINK-4618 > URL: https://issues.apache.org/jira/browse/FLINK-4618 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.2 > Environment: Flink 1.1.2 > Kafka Broker 0.10.0 > Hadoop 2.7.0 >Reporter: Melmoth > > There seem to be an issue with the offset management in Flink. When a job is > stopped and startet again, a message from the previous offset is read again. > I enabled checkpoints (EXACTLY_ONCE) and FsStateBackend. I started with a new > consumer group and emitted one record. > You can cleary see, that the consumer waits for a new record at offset > 4848911, which is correct. After restarting, it consumes a record at 4848910, > causing the record to be consumed more than once. > I checked the offset with the Kafka CMD tools, the commited offset in > zookeeper is 4848910. > Here is my log output: > {code} > 10:29:24,225 DEBUG org.apache.kafka.clients.NetworkClient >- Initiating connection to node 2147482646 at hdp1:6667. > 10:29:24,225 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Fetching > committed offsets for partitions: [myTopic-0] > 10:29:24,228 DEBUG org.apache.kafka.clients.NetworkClient >- Completed connection to node 2147482646 > 10:29:24,234 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - No > committed offset for partition myTopic-0 > 10:29:24,238 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Resetting offset for partition myTopic-0 to latest offset. > 10:29:24,244 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher >- Fetched offset 4848910 for partition myTopic-0 > 10:29:24,245 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:24,773 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > 10:29:25,276 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848910 > -- Inserting a new event here > 10:30:22,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Adding fetched record for partition myTopic-0 with offset 4848910 to > buffered record list > 10:30:22,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Returning fetched records at offset 4848910 for assigned partition > myTopic-0 and update position to 4848911 > 10:30:22,451 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:22,953 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,456 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,887 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Triggering checkpoint 6 @ 1473841823887 > 10:30:23,957 TRACE org.apache.kafka.clients.consumer.internals.Fetcher >- Added fetch request for partition myTopic-0 at offset 4848911 > 10:30:23,996 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator >- Completed checkpoint 6 (in 96 ms) > 10:30:24,196 TRACE > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Sending > offset-commit request with {myTopic-0=OffsetAndMetadata{offset=4848910, > metadata=''}} to Node(2147482646, hdp1, 6667) > 10:30:24,204 DEBUG > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Committed > offset 4848910 for partition myTopic-0 > 10:30:24,460 TRACE org.apache.kafka.clients.consumer.internals.Fetcher
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493841#comment-15493841 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79006658 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java --- @@ -82,43 +82,46 @@ protected void initialize() throws Exception { public void run() throws Exception { SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + try { + while (this.running && !terminationRequested()) { - while (this.running && !terminationRequested()) { + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } + super.run(); - super.run(); + // check if termination was requested + verifyEndOfSuperstepState(); - // check if termination was requested - verifyEndOfSuperstepState(); + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } - - boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } - if (terminated) { - requestTermination(); - } - else { - incrementIterationCounter(); + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } + + boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + + if (terminated) { + requestTermination(); + this.driver.cleanup(); --- End diff -- Am very sorry. That was a mistake. An oversight. I forgot to remove the cleanup() called after termination. > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79006658 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java --- @@ -82,43 +82,46 @@ protected void initialize() throws Exception { public void run() throws Exception { SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + try { + while (this.running && !terminationRequested()) { - while (this.running && !terminationRequested()) { + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } + super.run(); - super.run(); + // check if termination was requested + verifyEndOfSuperstepState(); - // check if termination was requested - verifyEndOfSuperstepState(); + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } - - boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } - if (terminated) { - requestTermination(); - } - else { - incrementIterationCounter(); + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } + + boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + + if (terminated) { + requestTermination(); + this.driver.cleanup(); --- End diff -- Am very sorry. That was a mistake. An oversight. I forgot to remove the cleanup() called after termination. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493835#comment-15493835 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79005879 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- I think you should use the already existing `ResettableDriver` interface instead of introducing a new resetting method. > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79005879 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java --- @@ -87,4 +87,6 @@ * @throws Exception Exceptions may be forwarded. */ void cancel() throws Exception; + + void resetForIterativeTasks() throws Exception; --- End diff -- I think you should use the already existing `ResettableDriver` interface instead of introducing a new resetting method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79003843 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java --- @@ -82,43 +82,46 @@ protected void initialize() throws Exception { public void run() throws Exception { SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + try { + while (this.running && !terminationRequested()) { - while (this.running && !terminationRequested()) { + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } + super.run(); - super.run(); + // check if termination was requested + verifyEndOfSuperstepState(); - // check if termination was requested - verifyEndOfSuperstepState(); + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } - - boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } - if (terminated) { - requestTermination(); - } - else { - incrementIterationCounter(); + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } + + boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + + if (terminated) { + requestTermination(); + this.driver.cleanup(); --- End diff -- `cleanup` will be called twice at the last step of the iteration, because it is also called in the `finally` block. This might potentially cause problems if one of the drivers assumes that `cleanup` will only be called once. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG
[ https://issues.apache.org/jira/browse/FLINK-4590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-4590: --- Assignee: Timo Walther > Some Table API tests are failing when debug lvl is set to DEBUG > --- > > Key: FLINK-4590 > URL: https://issues.apache.org/jira/browse/FLINK-4590 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Timo Walther > > For debugging another issue, I've set the log level on travis to DEBUG. > After that, the Table API tests started failing > {code} > Failed tests: > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error > occurred while applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > SetOperatorsITCase.testMinus:175 Internal error: Error occurred while > applying rule DataSetScanRule > {code} > Probably Calcite is executing additional assertions depending on the debug > level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493806#comment-15493806 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2496#discussion_r79003843 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java --- @@ -82,43 +82,46 @@ protected void initialize() throws Exception { public void run() throws Exception { SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); + try { + while (this.running && !terminationRequested()) { - while (this.running && !terminationRequested()) { + if (log.isInfoEnabled()) { + log.info(formatLogString("starting iteration [" + currentIteration() + "]")); + } - if (log.isInfoEnabled()) { - log.info(formatLogString("starting iteration [" + currentIteration() + "]")); - } + super.run(); - super.run(); + // check if termination was requested + verifyEndOfSuperstepState(); - // check if termination was requested - verifyEndOfSuperstepState(); + if (isWorksetUpdate && isWorksetIteration) { + long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); + worksetAggregator.aggregate(numCollected); + } - if (isWorksetUpdate && isWorksetIteration) { - long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset(); - worksetAggregator.aggregate(numCollected); - } - - if (log.isInfoEnabled()) { - log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); - } - - // let the successors know that the end of this superstep data is reached - sendEndOfSuperstep(); - - if (isWorksetUpdate) { - // notify iteration head if responsible for workset update - worksetBackChannel.notifyOfEndOfSuperstep(); - } - - boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + if (log.isInfoEnabled()) { + log.info(formatLogString("finishing iteration [" + currentIteration() + "]")); + } - if (terminated) { - requestTermination(); - } - else { - incrementIterationCounter(); + // let the successors know that the end of this superstep data is reached + sendEndOfSuperstep(); + + if (isWorksetUpdate) { + // notify iteration head if responsible for workset update + worksetBackChannel.notifyOfEndOfSuperstep(); + } + + boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1); + + if (terminated) { + requestTermination(); + this.driver.cleanup(); --- End diff -- `cleanup` will be called twice at the last step of the iteration, because it is also called in the `finally` block. This might potentially cause problems if one of the drivers assumes that `cleanup` will only be called once. > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that
[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2454 It seems that Calcite is planning to add additional clauses to the SQL statement. Like `SELECT STREAM x FROM y EMIT xyz`. If we really remove the keyword we have to make sure that Calcite's parser will also parse `EMIT` without the `STREAM` keyword. @fhueske what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493770#comment-15493770 ] ASF GitHub Bot commented on FLINK-4546: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2454 It seems that Calcite is planning to add additional clauses to the SQL statement. Like `SELECT STREAM x FROM y EMIT xyz`. If we really remove the keyword we have to make sure that Calcite's parser will also parse `EMIT` without the `STREAM` keyword. @fhueske what do you think? > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4623) Create Physical Execution Plan of a DataStream
[ https://issues.apache.org/jira/browse/FLINK-4623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-4623: Labels: starter (was: ) > Create Physical Execution Plan of a DataStream > -- > > Key: FLINK-4623 > URL: https://issues.apache.org/jira/browse/FLINK-4623 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > Labels: starter > > The {{StreamTableEnvironment#explain(Table)}} command for tables of a > {{StreamTableEnvironment}} only outputs the abstract syntax tree. It would be > helpful if the {{explain}} method could also generate a string from the > {{DataStream}} containing a physical execution plan. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4623) Create Physical Execution Plan of a DataStream
Timo Walther created FLINK-4623: --- Summary: Create Physical Execution Plan of a DataStream Key: FLINK-4623 URL: https://issues.apache.org/jira/browse/FLINK-4623 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther The {{StreamTableEnvironment#explain(Table)}} command for tables of a {{StreamTableEnvironment}} only outputs the abstract syntax tree. It would be helpful if the {{explain}} method could also generate a string from the {{DataStream}} containing a physical execution plan. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-4599. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed in 545b72bee9b2297c9d1d2f5d59d6d839378fde92. I will create an issue regarding the Physical Stream Execution Plan. > Add 'explain()' also to StreamTableEnvironment > -- > > Key: FLINK-4599 > URL: https://issues.apache.org/jira/browse/FLINK-4599 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > Labels: starter > Fix For: 1.2.0 > > > Currenlty, only the BatchTableEnvironment supports the {{explain}} command > for tables. We should also support it for the StreamTableEnvironment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2485 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493730#comment-15493730 ] ASF GitHub Bot commented on FLINK-3322: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2495 The build shows failure but not sure what is the failure. It does not show any specific test case. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2495: FLINK-3322 - Make sorters to reuse the memory pages alloc...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2495 The build shows failure but not sure what is the failure. It does not show any specific test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2496: FLINK-4615 Reusing the memory allocated for the drivers a...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2496 Builds seems to pass with this latest commit. Any feedback here @ggevay and @StephanEwen . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493720#comment-15493720 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2496 Builds seems to pass with this latest commit. Any feedback here @ggevay and @StephanEwen . > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2485: [Flink 4599] - Add 'explain()' also to StreamTableEnviron...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2485 Thanks @chobeat. I fixed minor formatting issues and extended the documentation a little bit. Will merge... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart
[ https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493667#comment-15493667 ] Matthew Barlocker commented on FLINK-4617: -- Sounds good. Thanks! > Kafka & Flink duplicate messages on restart > --- > > Key: FLINK-4617 > URL: https://issues.apache.org/jira/browse/FLINK-4617 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2 > Environment: Ubuntu 16.04 > Flink 1.1.* > Kafka 0.9.0.1 > Scala 2.11.7 > Java 1.8.0_91 >Reporter: Matthew Barlocker >Priority: Critical > > [StackOverflow > Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart] > Flink (the kafka connector) re-runs the last 3-9 messages it saw before it > was shut down. > *My code:* > {code} > import java.util.Properties > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.CheckpointingMode > import org.apache.flink.streaming.connectors.kafka._ > import org.apache.flink.streaming.util.serialization._ > import org.apache.flink.runtime.state.filesystem._ > object Runner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(500) > env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")) > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val properties = new Properties() > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "testing"); > val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new > SimpleStringSchema(), properties) > val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", > "testing-out", new SimpleStringSchema()) > env.addSource(kafkaConsumer) > .addSink(kafkaProducer) > env.execute() > } > } > {code} > *My sbt dependencies:* > {code} > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % "1.1.2", > "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", > "org.apache.flink" %% "flink-clients" % "1.1.2", > "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", > "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" > ) > {code} > *My process:* > using 3 terminals: > {code} > TERM-1 start sbt, run program > TERM-2 create kafka topics testing-in and testing-out > TERM-2 run kafka-console-producer on testing-in topic > TERM-3 run kafka-console-consumer on testing-out topic > TERM-2 send data to kafka producer. > Wait for a couple seconds (buffers need to flush) > TERM-3 watch data appear in testing-out topic > Wait for at least 500 milliseconds for checkpointing to happen > TERM-1 stop sbt > TERM-1 run sbt > TERM-3 watch last few lines of data appear in testing-out topic > {code} > *My expectations:* > When there are no errors in the system, I expect to be able to turn flink on > and off without reprocessing messages that successfully completed the stream > in a prior run. > *My attempts to fix:* > I've added the call to setStateBackend, thinking that perhaps the default > memory backend just didn't remember correctly. That didn't seem to help. > I've removed the call to enableCheckpointing, hoping that perhaps there was a > separate mechanism to track state in Flink vs Zookeeper. That didn't seem to > help. > I've used different sinks, RollingFileSink, print(); hoping that maybe the > bug was in kafka. That didn't seem to help. > I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that > maybe the bug was in the latest version. That didn't seem to help. > I've added the zookeeper.connect config to the properties object, hoping that > the comment about it only being useful in 0.8 was wrong. That didn't seem to > help. > I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea > drfloob). That didn't seem to help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2502: [FLINK-4550] [table] Clearly define SQL operator t...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2502 [FLINK-4550] [table] Clearly define SQL operator table Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR reduces the number of SQL operators to those that are fully supported. Since the SQL validator know which functions/operations are supported, this PR should also reduce the number of exceptions in code generation etc. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-4550 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2502 commit 1790235551cd54a3b5f7c7b38c251f0af8d2ce76 Author: twalthrDate: 2016-09-15T14:56:16Z [FLINK-4550] [table] Clearly define SQL operator table --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4550) Clearly define SQL operator table
[ https://issues.apache.org/jira/browse/FLINK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493575#comment-15493575 ] ASF GitHub Bot commented on FLINK-4550: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2502 [FLINK-4550] [table] Clearly define SQL operator table Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR reduces the number of SQL operators to those that are fully supported. Since the SQL validator know which functions/operations are supported, this PR should also reduce the number of exceptions in code generation etc. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-4550 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2502 commit 1790235551cd54a3b5f7c7b38c251f0af8d2ce76 Author: twalthrDate: 2016-09-15T14:56:16Z [FLINK-4550] [table] Clearly define SQL operator table > Clearly define SQL operator table > - > > Key: FLINK-4550 > URL: https://issues.apache.org/jira/browse/FLINK-4550 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Labels: starter > > Currently, we use {{SqlStdOperatorTable.instance()}} for setting all > supported operations. However, not all of them are actually supported. > {{FunctionCatalog}} should only return those operators that are tested and > documented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4622) CLI help message should include 'savepoint' action
[ https://issues.apache.org/jira/browse/FLINK-4622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493533#comment-15493533 ] ASF GitHub Bot commented on FLINK-4622: --- GitHub user skidder opened a pull request: https://github.com/apache/flink/pull/2501 [FLINK-4622] CLI help message should include 'savepoint' action The Flink CLI help message should include the 'savepoint' action in the list of available actions. It currently looks like: ```shell bash-4.3# flink foo "foo" is not a valid action. Valid actions are "run", "list", "info", "stop", or "cancel". Specify the version option (-v or --version) to print Flink version. Specify the help option (-h or --help) to get help on the command. ``` This pull-request adds the `savepoint` action to the list of valid actions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skidder/flink flink-4622 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2501.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2501 commit a309fd4560ca5e08d05eb38fc377939aa87a5add Author: Scott KidderDate: 2016-09-15T14:46:29Z [FLINK-4622] Include 'savepoint' in the CLI help message > CLI help message should include 'savepoint' action > -- > > Key: FLINK-4622 > URL: https://issues.apache.org/jira/browse/FLINK-4622 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.2 >Reporter: Scott Kidder >Assignee: Scott Kidder >Priority: Trivial > Original Estimate: 1h > Remaining Estimate: 1h > > The Flink CLI help message should include the 'savepoint' action in the list > of available actions. It currently looks like: > {code} > bash-4.3# flink foo > "foo" is not a valid action. > Valid actions are "run", "list", "info", "stop", or "cancel". > Specify the version option (-v or --version) to print Flink version. > Specify the help option (-h or --help) to get help on the command. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2501: [FLINK-4622] CLI help message should include 'save...
GitHub user skidder opened a pull request: https://github.com/apache/flink/pull/2501 [FLINK-4622] CLI help message should include 'savepoint' action The Flink CLI help message should include the 'savepoint' action in the list of available actions. It currently looks like: ```shell bash-4.3# flink foo "foo" is not a valid action. Valid actions are "run", "list", "info", "stop", or "cancel". Specify the version option (-v or --version) to print Flink version. Specify the help option (-h or --help) to get help on the command. ``` This pull-request adds the `savepoint` action to the list of valid actions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skidder/flink flink-4622 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2501.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2501 commit a309fd4560ca5e08d05eb38fc377939aa87a5add Author: Scott KidderDate: 2016-09-15T14:46:29Z [FLINK-4622] Include 'savepoint' in the CLI help message --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4622) CLI help message should include 'savepoint' action
Scott Kidder created FLINK-4622: --- Summary: CLI help message should include 'savepoint' action Key: FLINK-4622 URL: https://issues.apache.org/jira/browse/FLINK-4622 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.1.2 Reporter: Scott Kidder Assignee: Scott Kidder Priority: Trivial The Flink CLI help message should include the 'savepoint' action in the list of available actions. It currently looks like: {code} bash-4.3# flink foo "foo" is not a valid action. Valid actions are "run", "list", "info", "stop", or "cancel". Specify the version option (-v or --version) to print Flink version. Specify the help option (-h or --help) to get help on the command. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493517#comment-15493517 ] ASF GitHub Bot commented on FLINK-3702: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 > As for this pull request I suggest to update the false Javadoc from the streaming API and document in the concepts that the aggregation field accessors and the keyselectors are slightly different due to there semantics. I've fixed the javadoc now in 154352ba879350bb1ab10b799456a67eb5819375. As for the differences between the key selectors and aggregations (which use `FieldAccessor`): I tried to consolidate the supported field expressions between the two in 7c2dceeb87b06963cc2618312f5f1039387003ad. Now the aggregations should accept any field expression that selects a single field and would be a valid key selector, and some more (selecting array elements). As for documenting that the aggregations don't work on a field that is still some composite type, I'm not sure if this is necessary, since this should be pretty obvious to the user. > The batch API aggregations do not have the overloaded version where you could pass a field accessor string. Here is the Jira for this: https://issues.apache.org/jira/browse/FLINK-4575 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...
Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2094 > As for this pull request I suggest to update the false Javadoc from the streaming API and document in the concepts that the aggregation field accessors and the keyselectors are slightly different due to there semantics. I've fixed the javadoc now in 154352ba879350bb1ab10b799456a67eb5819375. As for the differences between the key selectors and aggregations (which use `FieldAccessor`): I tried to consolidate the supported field expressions between the two in 7c2dceeb87b06963cc2618312f5f1039387003ad. Now the aggregations should accept any field expression that selects a single field and would be a valid key selector, and some more (selecting array elements). As for documenting that the aggregations don't work on a field that is still some composite type, I'm not sure if this is necessary, since this should be pretty obvious to the user. > The batch API aggregations do not have the overloaded version where you could pass a field accessor string. Here is the Jira for this: https://issues.apache.org/jira/browse/FLINK-4575 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property
[ https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493507#comment-15493507 ] ASF GitHub Bot commented on FLINK-4561: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2459 @shijinkui I understand your motivation, it is a good idea in general. Please do not feel discouraged to suggest improvements. There are some things that look like they could be done better, but actually cannot be changed, because of some tricky bugs or limitations in other parts (like build plugins). That was the case here, so sometimes we cannot change some things. > replace all the scala version as a `scala.binary.version` property > -- > > Key: FLINK-4561 > URL: https://issues.apache.org/jira/browse/FLINK-4561 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > > replace all the scala version(2.10) as a property `scala.binary.version` > defined in root pom properties. default scala version property is 2.10. > modify: > 1. dependency include scala version > 2. module defining include scala version > 3. scala version upgrade to 2.11.8 from 2.11.7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2459: [FLINK-4561] replace all the scala version as a `scala.bi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2459 @shijinkui I understand your motivation, it is a good idea in general. Please do not feel discouraged to suggest improvements. There are some things that look like they could be done better, but actually cannot be changed, because of some tricky bugs or limitations in other parts (like build plugins). That was the case here, so sometimes we cannot change some things. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3442) Expose savepoint button on web ui
[ https://issues.apache.org/jira/browse/FLINK-3442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493485#comment-15493485 ] Scott Kidder commented on FLINK-3442: - This is duplicated by FLINK-4336 > Expose savepoint button on web ui > - > > Key: FLINK-3442 > URL: https://issues.apache.org/jira/browse/FLINK-3442 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Gyula Fora >Priority: Minor > > Similarly to Cancel there should be a Savepoint button to initiate a > savepoint for streaming jobs. > These 2 buttons should NOT be next to each other :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493427#comment-15493427 ] Vasia Kalavri commented on FLINK-2254: -- It sounds good to me [~ivan.mushketyk]. Please go ahead and break this into smaller tasks. We can then prioritize. Thanks! > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4620) Automatically creating savepoints
[ https://issues.apache.org/jira/browse/FLINK-4620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493397#comment-15493397 ] Stephan Ewen commented on FLINK-4620: - Adding [~uce] to this thread. There is a FLIP to unify checkpoints and savepoints: https://cwiki.apache.org/confluence/display/FLINK/FLIP-10%3A+Unify+Checkpoints+and+Savepoints I think periodic savepoints are quite reasonable and fit well with the concept of unified checkpoints and savepoints. > Automatically creating savepoints > - > > Key: FLINK-4620 > URL: https://issues.apache.org/jira/browse/FLINK-4620 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.1.2 >Reporter: Niels Basjes > > In the current versions of Flink you can run an external command and then a > savepoint is persisted in a durable location. > Feature request: Make this a lot more automatic and easy to use. > _Proposed workflow_ > # In my application I do something like this: > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStateBackend(new FsStateBackend("hdfs:///tmp/applicationState")); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > env.enableAutomaticSavePoints(30); > env.enableAutomaticSavePointCleaner(10); > {code} > # When I start the application for the first time the state backend is > 'empty'. > I expect the system to start in a clean state. > After 10 minutes (30ms) a savepoint is created and stored. > # When I stop and start the topology again it will automatically restore the > last available savepoint. > Things to think about: > * Note that this feature still means the manual version is useful!! > * What to do on startup if the state is incompatible with the topology? Fail > the startup? > * How many automatic savepoints to we keep? Only the last one? > * Perhaps the API should allow multiple automatic savepoints at different > intervals in different locations. > {code} > // Make every 10 minutes and keep the last 10 > env.enableAutomaticSavePoints(30, new > FsStateBackend("hdfs:///tmp/applicationState"), 10); > // Make every 24 hours and keep the last 30 > // Useful for being able to reproduce a problem a few days later > env.enableAutomaticSavePoints(8640, new > FsStateBackend("hdfs:///tmp/applicationDailyStateSnapshot"), 30); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4550) Clearly define SQL operator table
[ https://issues.apache.org/jira/browse/FLINK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-4550: --- Assignee: Timo Walther > Clearly define SQL operator table > - > > Key: FLINK-4550 > URL: https://issues.apache.org/jira/browse/FLINK-4550 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Labels: starter > > Currently, we use {{SqlStdOperatorTable.instance()}} for setting all > supported operations. However, not all of them are actually supported. > {{FunctionCatalog}} should only return those operators that are tested and > documented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4549) Test and document implicitly supported SQL functions
[ https://issues.apache.org/jira/browse/FLINK-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493229#comment-15493229 ] ASF GitHub Bot commented on FLINK-4549: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2500 [FLINK-4549] [table] Test and document implicitly supported SQL functions Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR tests and documents all SQL functions that are currently supported. Things that are not documented are not supposed to work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink TableApiAllCalciteScalarFunctions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2500.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2500 commit 782fc7c09da25c44ecb61620acf2f7ce9d7077ad Author: twalthrDate: 2016-09-02T09:00:09Z [FLINK-4549] [table] Test and document implicitly supported SQL functions > Test and document implicitly supported SQL functions > > > Key: FLINK-4549 > URL: https://issues.apache.org/jira/browse/FLINK-4549 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Calcite supports many SQL functions by translating them into {{RexNode}} s. > However, SQL functions like {{NULLIF}}, {{OVERLAPS}} are neither tested nor > document although supported. > These functions should be tested and added to the documentation. We could > adopt parts from the Calcite documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2500: [FLINK-4549] [table] Test and document implicitly ...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2500 [FLINK-4549] [table] Test and document implicitly supported SQL functions Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR tests and documents all SQL functions that are currently supported. Things that are not documented are not supposed to work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink TableApiAllCalciteScalarFunctions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2500.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2500 commit 782fc7c09da25c44ecb61620acf2f7ce9d7077ad Author: twalthrDate: 2016-09-02T09:00:09Z [FLINK-4549] [table] Test and document implicitly supported SQL functions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2499: [FLINK-4485] close and remove user class loader af...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2499 [FLINK-4485] close and remove user class loader after job completion Keeping the user class loader around after job completion may lead to excessive temp space usage because all user jars are kept until the class loader is garbage collected. Tests showed that garbage collection can be delayed for a long time after the class loader is not referenced anymore. Note that for the class loader to not be referenced anymore, its job has to be removed from the archive. The fastest way to minimize temp space usage is to close and remove the URLClassloader after job completion. This requires us to keep a serializable copy of all data which needs the user class loader after job completion, e.g. to display data on the web interface. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2499.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2499 commit 6ed17b9f5b9c13c80200ccf3db82bbfe727830bb Author: Maximilian MichelsDate: 2016-09-15T09:00:58Z [FLINK-4485] close and remove user class loader after job completion Keeping the user class loader around after job completion may lead to excessive temp space usage because all user jars are kept until the class loader is garbage collected. Tests showed that garbage collection can be delayed for a long time after the class loader is not referenced anymore. Note that for the class loader to not be referenced anymore, its job has to be removed from the archive. The fastest way to minimize temp space usage is to close and remove the URLClassloader after job completion. This requires us to keep a serializable copy of all data which needs the user class loader after job completion, e.g. to display data on the web interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem
[ https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493019#comment-15493019 ] ASF GitHub Bot commented on FLINK-4485: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2499 [FLINK-4485] close and remove user class loader after job completion Keeping the user class loader around after job completion may lead to excessive temp space usage because all user jars are kept until the class loader is garbage collected. Tests showed that garbage collection can be delayed for a long time after the class loader is not referenced anymore. Note that for the class loader to not be referenced anymore, its job has to be removed from the archive. The fastest way to minimize temp space usage is to close and remove the URLClassloader after job completion. This requires us to keep a serializable copy of all data which needs the user class loader after job completion, e.g. to display data on the web interface. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2499.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2499 commit 6ed17b9f5b9c13c80200ccf3db82bbfe727830bb Author: Maximilian MichelsDate: 2016-09-15T09:00:58Z [FLINK-4485] close and remove user class loader after job completion Keeping the user class loader around after job completion may lead to excessive temp space usage because all user jars are kept until the class loader is garbage collected. Tests showed that garbage collection can be delayed for a long time after the class loader is not referenced anymore. Note that for the class loader to not be referenced anymore, its job has to be removed from the archive. The fastest way to minimize temp space usage is to close and remove the URLClassloader after job completion. This requires us to keep a serializable copy of all data which needs the user class loader after job completion, e.g. to display data on the web interface. > Finished jobs in yarn session fill /tmp filesystem > -- > > Key: FLINK-4485 > URL: https://issues.apache.org/jira/browse/FLINK-4485 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Niels Basjes >Assignee: Maximilian Michels >Priority: Blocker > > On a Yarn cluster I start a yarn-session with a few containers and task slots. > Then I fire a 'large' number of Flink batch jobs in sequence against this > yarn session. It is the exact same job (java code) yet it gets different > parameters. > In this scenario it is exporting HBase tables to files in HDFS and the > parameters are about which data from which tables and the name of the target > directory. > After running several dozen jobs the jobs submission started to fail and we > investigated. > We found that the cause was that on the Yarn node which was hosting the > jobmanager the /tmp file system was full (4GB was 100% full). > How ever the output of {{du -hcs /tmp}} showed only 200MB in use. > We found that a very large file (we guess it is the jar of the job) was put > in /tmp , used, deleted yet the file handle was not closed by the jobmanager. > As soon as we killed the jobmanager the disk space was freed. > The summary of the impact of this is that a yarn-session that receives enough > jobs brings down the Yarn node for all users. > See parts of the output we got from {{lsof}} below. > {code} > COMMAND PID USER FD TYPE DEVICE SIZE > NODE NAME > java 15034 nbasjes 550r REG 253,17 66219695 > 245 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 > (deleted) > java 15034 nbasjes 551r REG 253,17 66219695 > 252 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 > (deleted) > java 15034 nbasjes 552r REG 253,17 66219695 > 267 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 > (deleted) > java 15034 nbasjes 553r REG 253,17 66219695 > 250 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 > (deleted) > java 15034 nbasjes 554r REG 253,17 66219695 > 288 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 > (deleted) > java 15034
[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts
[ https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492928#comment-15492928 ] ASF GitHub Bot commented on FLINK-3030: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2448 @StephanEwen Should I change anything else in this PR? > Enhance Dashboard to show Execution Attempts > > > Key: FLINK-3030 > URL: https://issues.apache.org/jira/browse/FLINK-3030 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > Fix For: 1.0.0 > > > Currently, the web dashboard shows only the latest execution attempt. We > should make all execution attempts and their accumulators available for > inspection. > The REST monitoring API supports this, so it should be a change only to the > frontend part. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2448 @StephanEwen Should I change anything else in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4621) Improve decimal literals of SQL API
Timo Walther created FLINK-4621: --- Summary: Improve decimal literals of SQL API Key: FLINK-4621 URL: https://issues.apache.org/jira/browse/FLINK-4621 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther Currently, all SQL {{DECIMAL}} types are converted to BigDecimals internally. By default, the SQL parsers creates {{DECIMAL}} literals of any number e.g. {{SELECT 1.0, 12, -0.5 FROM x}}. I think it would be better if these simple numbers would be represented as Java primitives instead of objects. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1135) Blog post with topic "Accessing Data Stored in Hive with Flink"
[ https://issues.apache.org/jira/browse/FLINK-1135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther closed FLINK-1135. --- Resolution: Won't Fix > Blog post with topic "Accessing Data Stored in Hive with Flink" > --- > > Key: FLINK-1135 > URL: https://issues.apache.org/jira/browse/FLINK-1135 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Timo Walther >Assignee: Robert Metzger >Priority: Minor > Attachments: 2014-09-29-querying-hive.md > > > Recently, I implemented a Flink job that accessed Hive. Maybe someone else is > going to try this. I created a blog post for the website to share my > experience. > You'll find the blog post file attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1135) Blog post with topic "Accessing Data Stored in Hive with Flink"
[ https://issues.apache.org/jira/browse/FLINK-1135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492738#comment-15492738 ] Timo Walther commented on FLINK-1135: - No this won't happen. I will close the issue. > Blog post with topic "Accessing Data Stored in Hive with Flink" > --- > > Key: FLINK-1135 > URL: https://issues.apache.org/jira/browse/FLINK-1135 > Project: Flink > Issue Type: Improvement > Components: Project Website >Reporter: Timo Walther >Assignee: Robert Metzger >Priority: Minor > Attachments: 2014-09-29-querying-hive.md > > > Recently, I implemented a Flink job that accessed Hive. Maybe someone else is > going to try this. I created a blog post for the website to share my > experience. > You'll find the blog post file attached. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem
[ https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492595#comment-15492595 ] Maximilian Michels commented on FLINK-4485: --- Yes, I was able to reproduce the problem with your test application yesterday. Checked out the relevant code and I think I'm on to something. > Finished jobs in yarn session fill /tmp filesystem > -- > > Key: FLINK-4485 > URL: https://issues.apache.org/jira/browse/FLINK-4485 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Niels Basjes >Assignee: Maximilian Michels >Priority: Blocker > > On a Yarn cluster I start a yarn-session with a few containers and task slots. > Then I fire a 'large' number of Flink batch jobs in sequence against this > yarn session. It is the exact same job (java code) yet it gets different > parameters. > In this scenario it is exporting HBase tables to files in HDFS and the > parameters are about which data from which tables and the name of the target > directory. > After running several dozen jobs the jobs submission started to fail and we > investigated. > We found that the cause was that on the Yarn node which was hosting the > jobmanager the /tmp file system was full (4GB was 100% full). > How ever the output of {{du -hcs /tmp}} showed only 200MB in use. > We found that a very large file (we guess it is the jar of the job) was put > in /tmp , used, deleted yet the file handle was not closed by the jobmanager. > As soon as we killed the jobmanager the disk space was freed. > The summary of the impact of this is that a yarn-session that receives enough > jobs brings down the Yarn node for all users. > See parts of the output we got from {{lsof}} below. > {code} > COMMAND PID USER FD TYPE DEVICE SIZE > NODE NAME > java 15034 nbasjes 550r REG 253,17 66219695 > 245 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 > (deleted) > java 15034 nbasjes 551r REG 253,17 66219695 > 252 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 > (deleted) > java 15034 nbasjes 552r REG 253,17 66219695 > 267 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 > (deleted) > java 15034 nbasjes 553r REG 253,17 66219695 > 250 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 > (deleted) > java 15034 nbasjes 554r REG 253,17 66219695 > 288 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 > (deleted) > java 15034 nbasjes 555r REG 253,17 66219695 > 298 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 > (deleted) > java 15034 nbasjes 557r REG 253,17 66219695 > 254 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 > (deleted) > java 15034 nbasjes 558r REG 253,17 66219695 > 292 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 > (deleted) > java 15034 nbasjes 559r REG 253,17 66219695 > 275 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 > (deleted) > java 15034 nbasjes 560r REG 253,17 66219695 > 159 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 > (deleted) > java 15034 nbasjes 562r REG 253,17 66219695 > 238 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 > (deleted) > java 15034 nbasjes 568r REG 253,17 66219695 > 246 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 > (deleted) > java 15034 nbasjes 569r REG 253,17 66219695 > 255 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 > (deleted) > java 15034 nbasjes 571r REG 253,17 66219695 > 299 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 > (deleted) > java 15034 nbasjes 572r REG 253,17 66219695 > 293 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 > (deleted) > java 15034 nbasjes 574r REG 253,17 66219695 > 256 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 > (deleted) > java 15034 nbasjes 575r REG 253,17 66219695 > 302 >
[jira] [Assigned] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem
[ https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-4485: - Assignee: Maximilian Michels > Finished jobs in yarn session fill /tmp filesystem > -- > > Key: FLINK-4485 > URL: https://issues.apache.org/jira/browse/FLINK-4485 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Niels Basjes >Assignee: Maximilian Michels >Priority: Blocker > > On a Yarn cluster I start a yarn-session with a few containers and task slots. > Then I fire a 'large' number of Flink batch jobs in sequence against this > yarn session. It is the exact same job (java code) yet it gets different > parameters. > In this scenario it is exporting HBase tables to files in HDFS and the > parameters are about which data from which tables and the name of the target > directory. > After running several dozen jobs the jobs submission started to fail and we > investigated. > We found that the cause was that on the Yarn node which was hosting the > jobmanager the /tmp file system was full (4GB was 100% full). > How ever the output of {{du -hcs /tmp}} showed only 200MB in use. > We found that a very large file (we guess it is the jar of the job) was put > in /tmp , used, deleted yet the file handle was not closed by the jobmanager. > As soon as we killed the jobmanager the disk space was freed. > The summary of the impact of this is that a yarn-session that receives enough > jobs brings down the Yarn node for all users. > See parts of the output we got from {{lsof}} below. > {code} > COMMAND PID USER FD TYPE DEVICE SIZE > NODE NAME > java 15034 nbasjes 550r REG 253,17 66219695 > 245 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 > (deleted) > java 15034 nbasjes 551r REG 253,17 66219695 > 252 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 > (deleted) > java 15034 nbasjes 552r REG 253,17 66219695 > 267 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 > (deleted) > java 15034 nbasjes 553r REG 253,17 66219695 > 250 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 > (deleted) > java 15034 nbasjes 554r REG 253,17 66219695 > 288 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 > (deleted) > java 15034 nbasjes 555r REG 253,17 66219695 > 298 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 > (deleted) > java 15034 nbasjes 557r REG 253,17 66219695 > 254 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 > (deleted) > java 15034 nbasjes 558r REG 253,17 66219695 > 292 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 > (deleted) > java 15034 nbasjes 559r REG 253,17 66219695 > 275 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 > (deleted) > java 15034 nbasjes 560r REG 253,17 66219695 > 159 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 > (deleted) > java 15034 nbasjes 562r REG 253,17 66219695 > 238 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 > (deleted) > java 15034 nbasjes 568r REG 253,17 66219695 > 246 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 > (deleted) > java 15034 nbasjes 569r REG 253,17 66219695 > 255 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 > (deleted) > java 15034 nbasjes 571r REG 253,17 66219695 > 299 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 > (deleted) > java 15034 nbasjes 572r REG 253,17 66219695 > 293 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 > (deleted) > java 15034 nbasjes 574r REG 253,17 66219695 > 256 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 > (deleted) > java 15034 nbasjes 575r REG 253,17 66219695 > 302 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0029 > (deleted) > java 15034 nbasjes 576r REG 253,17 66219695
[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host
[ https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492554#comment-15492554 ] ASF GitHub Bot commented on FLINK-3857: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1962 Thanks @HungUnicorn, thats useful info. I wonder though if this config should be set by the user, instead of letting the connector internally set this. > Add reconnect attempt to Elasticsearch host > --- > > Key: FLINK-3857 > URL: https://issues.apache.org/jira/browse/FLINK-3857 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.0.2 >Reporter: Fabian Hueske >Assignee: Subhobrata Dey > > Currently, the connection to the Elasticsearch host is opened in > {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a > changed DNS entry), the sink fails. > I propose to catch the Exception for lost connections in the {{invoke()}} > method and try to re-open the connection for a configurable number of times > with a certain delay. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1962 Thanks @HungUnicorn, thats useful info. I wonder though if this config should be set by the user, instead of letting the connector internally set this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4617) Kafka & Flink duplicate messages on restart
[ https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-4617. -- > Kafka & Flink duplicate messages on restart > --- > > Key: FLINK-4617 > URL: https://issues.apache.org/jira/browse/FLINK-4617 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2 > Environment: Ubuntu 16.04 > Flink 1.1.* > Kafka 0.9.0.1 > Scala 2.11.7 > Java 1.8.0_91 >Reporter: Matthew Barlocker >Priority: Critical > > [StackOverflow > Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart] > Flink (the kafka connector) re-runs the last 3-9 messages it saw before it > was shut down. > *My code:* > {code} > import java.util.Properties > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.CheckpointingMode > import org.apache.flink.streaming.connectors.kafka._ > import org.apache.flink.streaming.util.serialization._ > import org.apache.flink.runtime.state.filesystem._ > object Runner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(500) > env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")) > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val properties = new Properties() > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "testing"); > val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new > SimpleStringSchema(), properties) > val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", > "testing-out", new SimpleStringSchema()) > env.addSource(kafkaConsumer) > .addSink(kafkaProducer) > env.execute() > } > } > {code} > *My sbt dependencies:* > {code} > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % "1.1.2", > "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", > "org.apache.flink" %% "flink-clients" % "1.1.2", > "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", > "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" > ) > {code} > *My process:* > using 3 terminals: > {code} > TERM-1 start sbt, run program > TERM-2 create kafka topics testing-in and testing-out > TERM-2 run kafka-console-producer on testing-in topic > TERM-3 run kafka-console-consumer on testing-out topic > TERM-2 send data to kafka producer. > Wait for a couple seconds (buffers need to flush) > TERM-3 watch data appear in testing-out topic > Wait for at least 500 milliseconds for checkpointing to happen > TERM-1 stop sbt > TERM-1 run sbt > TERM-3 watch last few lines of data appear in testing-out topic > {code} > *My expectations:* > When there are no errors in the system, I expect to be able to turn flink on > and off without reprocessing messages that successfully completed the stream > in a prior run. > *My attempts to fix:* > I've added the call to setStateBackend, thinking that perhaps the default > memory backend just didn't remember correctly. That didn't seem to help. > I've removed the call to enableCheckpointing, hoping that perhaps there was a > separate mechanism to track state in Flink vs Zookeeper. That didn't seem to > help. > I've used different sinks, RollingFileSink, print(); hoping that maybe the > bug was in kafka. That didn't seem to help. > I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that > maybe the bug was in the latest version. That didn't seem to help. > I've added the zookeeper.connect config to the properties object, hoping that > the comment about it only being useful in 0.8 was wrong. That didn't seem to > help. > I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea > drfloob). That didn't seem to help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4617) Kafka & Flink duplicate messages on restart
[ https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-4617. Resolution: Not A Bug > Kafka & Flink duplicate messages on restart > --- > > Key: FLINK-4617 > URL: https://issues.apache.org/jira/browse/FLINK-4617 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2 > Environment: Ubuntu 16.04 > Flink 1.1.* > Kafka 0.9.0.1 > Scala 2.11.7 > Java 1.8.0_91 >Reporter: Matthew Barlocker >Priority: Critical > > [StackOverflow > Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart] > Flink (the kafka connector) re-runs the last 3-9 messages it saw before it > was shut down. > *My code:* > {code} > import java.util.Properties > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.CheckpointingMode > import org.apache.flink.streaming.connectors.kafka._ > import org.apache.flink.streaming.util.serialization._ > import org.apache.flink.runtime.state.filesystem._ > object Runner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(500) > env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")) > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val properties = new Properties() > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "testing"); > val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new > SimpleStringSchema(), properties) > val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", > "testing-out", new SimpleStringSchema()) > env.addSource(kafkaConsumer) > .addSink(kafkaProducer) > env.execute() > } > } > {code} > *My sbt dependencies:* > {code} > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % "1.1.2", > "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", > "org.apache.flink" %% "flink-clients" % "1.1.2", > "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", > "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" > ) > {code} > *My process:* > using 3 terminals: > {code} > TERM-1 start sbt, run program > TERM-2 create kafka topics testing-in and testing-out > TERM-2 run kafka-console-producer on testing-in topic > TERM-3 run kafka-console-consumer on testing-out topic > TERM-2 send data to kafka producer. > Wait for a couple seconds (buffers need to flush) > TERM-3 watch data appear in testing-out topic > Wait for at least 500 milliseconds for checkpointing to happen > TERM-1 stop sbt > TERM-1 run sbt > TERM-3 watch last few lines of data appear in testing-out topic > {code} > *My expectations:* > When there are no errors in the system, I expect to be able to turn flink on > and off without reprocessing messages that successfully completed the stream > in a prior run. > *My attempts to fix:* > I've added the call to setStateBackend, thinking that perhaps the default > memory backend just didn't remember correctly. That didn't seem to help. > I've removed the call to enableCheckpointing, hoping that perhaps there was a > separate mechanism to track state in Flink vs Zookeeper. That didn't seem to > help. > I've used different sinks, RollingFileSink, print(); hoping that maybe the > bug was in kafka. That didn't seem to help. > I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that > maybe the bug was in the latest version. That didn't seem to help. > I've added the zookeeper.connect config to the properties object, hoping that > the comment about it only being useful in 0.8 was wrong. That didn't seem to > help. > I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea > drfloob). That didn't seem to help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart
[ https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492513#comment-15492513 ] Tzu-Li (Gordon) Tai commented on FLINK-4617: Hi [~mbarlocker], now that this is sorted out (on SO), I'm closing this ticket now ;) > Kafka & Flink duplicate messages on restart > --- > > Key: FLINK-4617 > URL: https://issues.apache.org/jira/browse/FLINK-4617 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, State Backends, Checkpointing >Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2 > Environment: Ubuntu 16.04 > Flink 1.1.* > Kafka 0.9.0.1 > Scala 2.11.7 > Java 1.8.0_91 >Reporter: Matthew Barlocker >Priority: Critical > > [StackOverflow > Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart] > Flink (the kafka connector) re-runs the last 3-9 messages it saw before it > was shut down. > *My code:* > {code} > import java.util.Properties > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.CheckpointingMode > import org.apache.flink.streaming.connectors.kafka._ > import org.apache.flink.streaming.util.serialization._ > import org.apache.flink.runtime.state.filesystem._ > object Runner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(500) > env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")) > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val properties = new Properties() > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "testing"); > val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new > SimpleStringSchema(), properties) > val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", > "testing-out", new SimpleStringSchema()) > env.addSource(kafkaConsumer) > .addSink(kafkaProducer) > env.execute() > } > } > {code} > *My sbt dependencies:* > {code} > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % "1.1.2", > "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", > "org.apache.flink" %% "flink-clients" % "1.1.2", > "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", > "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" > ) > {code} > *My process:* > using 3 terminals: > {code} > TERM-1 start sbt, run program > TERM-2 create kafka topics testing-in and testing-out > TERM-2 run kafka-console-producer on testing-in topic > TERM-3 run kafka-console-consumer on testing-out topic > TERM-2 send data to kafka producer. > Wait for a couple seconds (buffers need to flush) > TERM-3 watch data appear in testing-out topic > Wait for at least 500 milliseconds for checkpointing to happen > TERM-1 stop sbt > TERM-1 run sbt > TERM-3 watch last few lines of data appear in testing-out topic > {code} > *My expectations:* > When there are no errors in the system, I expect to be able to turn flink on > and off without reprocessing messages that successfully completed the stream > in a prior run. > *My attempts to fix:* > I've added the call to setStateBackend, thinking that perhaps the default > memory backend just didn't remember correctly. That didn't seem to help. > I've removed the call to enableCheckpointing, hoping that perhaps there was a > separate mechanism to track state in Flink vs Zookeeper. That didn't seem to > help. > I've used different sinks, RollingFileSink, print(); hoping that maybe the > bug was in kafka. That didn't seem to help. > I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that > maybe the bug was in the latest version. That didn't seem to help. > I've added the zookeeper.connect config to the properties object, hoping that > the comment about it only being useful in 0.8 was wrong. That didn't seem to > help. > I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea > drfloob). That didn't seem to help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
[ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492504#comment-15492504 ] ASF GitHub Bot commented on FLINK-4615: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2496 Updated PR. Handles reset()in a much better way. PageRankITCase exposed some cases. Also does the clean up of the driver now. Still wondering there should be a reason why the driver was instantiated every time for an iterative task. May be am missing the reason for that. If that is for sure needed then this whole fix may not work. But the patch as such is better in terms of handling the reset() part. > Reusing the memory allocated for the drivers and iterators > -- > > Key: FLINK-4615 > URL: https://issues.apache.org/jira/browse/FLINK-4615 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Fix For: 1.0.0 > > > Raising as a subtask so that individually can be committed and for better > closer reviews. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2496: FLINK-4615 Reusing the memory allocated for the drivers a...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2496 Updated PR. Handles reset()in a much better way. PageRankITCase exposed some cases. Also does the clean up of the driver now. Still wondering there should be a reason why the driver was instantiated every time for an iterative task. May be am missing the reason for that. If that is for sure needed then this whole fix may not work. But the patch as such is better in terms of handling the reset() part. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---