[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2618 Hi @mxm ! Please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15587764#comment-15587764 ] Gabor Gevay edited comment on FLINK-3999 at 10/19/16 5:47 AM: -- Won't Fix; See comment here: https://github.com/apache/flink/pull/2642#issuecomment-254628846 was (Author: ggevay): See comment here: https://github.com/apache/flink/pull/2642#issuecomment-254628846 > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay resolved FLINK-3999. Resolution: Won't Fix See comment here: https://github.com/apache/flink/pull/2642#issuecomment-254628846 > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...
Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2642 OK, I see your point now @StephanEwen. @nssalian, I'm sorry for opening the jira; I didn't think it through from this perspective at the time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15587758#comment-15587758 ] ASF GitHub Bot commented on FLINK-3999: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2642 OK, I see your point now @StephanEwen. @nssalian, I'm sorry for opening the jira; I didn't think it through from this perspective at the time. > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4854) Efficient Batch Operator in Streaming
[ https://issues.apache.org/jira/browse/FLINK-4854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaowei Jiang updated FLINK-4854: - Component/s: DataStream API > Efficient Batch Operator in Streaming > - > > Key: FLINK-4854 > URL: https://issues.apache.org/jira/browse/FLINK-4854 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Xiaowei Jiang >Assignee: MaGuowei > Labels: features > Original Estimate: 168h > Remaining Estimate: 168h > > Very often, it's more efficient to process a batch of records at once instead > of processing them one by one. We can use window to achieve this > functionality. However, window will store all records in states, which can be > costly. It's desirable to have an efficient implementation of batch operator. > The batch operator works per task and behave similarly to aligned windows. > Here is an example of how the interface looks like to a user. > interface BatchFunction { > // add the record to the buffer > // returns if the batch is ready to be flushed > boolean addRecord(T record); > // process all pending records in the buffer > void flush(Collector collector) ; > } > DataStream ds = ... > BatchFunction func = ... > ds.batch(func); > The operator calls addRecord for each record. The batch function saves the > record in its own buffer. The addRecord returns if the pending buffer should > be flushed. In that case, the operator invokes flush. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read
[ https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15587389#comment-15587389 ] Tzu-Li (Gordon) Tai commented on FLINK-4727: Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/e4343ba > Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no > data is read > -- > > Key: FLINK-4727 > URL: https://issues.apache.org/jira/browse/FLINK-4727 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > This is basically the 0.9 version counterpart for FLINK-3440. > When the 0.9 consumer fetches initial offsets from Kafka on startup, but does > not have any data to read, it should also checkpoint & commit these initial > offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4856) Add MapState for keyed streams
Xiaogang Shi created FLINK-4856: --- Summary: Add MapState for keyed streams Key: FLINK-4856 URL: https://issues.apache.org/jira/browse/FLINK-4856 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Reporter: Xiaogang Shi Many states in keyed streams are organized as key-value pairs. Currently, these states are implemented by storing the entire map into a ValueState or a ListState. The implementation however is very costly because all entries have to be serialized/deserialized when updating a single entry. To improve the efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read
[ https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15587370#comment-15587370 ] ASF GitHub Bot commented on FLINK-4727: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2585 > Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no > data is read > -- > > Key: FLINK-4727 > URL: https://issues.apache.org/jira/browse/FLINK-4727 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > This is basically the 0.9 version counterpart for FLINK-3440. > When the 0.9 consumer fetches initial offsets from Kafka on startup, but does > not have any data to read, it should also checkpoint & commit these initial > offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2585: [FLINK-4727] [kafka-connector] Set missing initial...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2585 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4727) Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read
[ https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15587354#comment-15587354 ] ASF GitHub Bot commented on FLINK-4727: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2585 The failing tests are unrelated, and related tests are covered and have passed. Merging this ... > Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no > data is read > -- > > Key: FLINK-4727 > URL: https://issues.apache.org/jira/browse/FLINK-4727 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.0, 1.1.3 > > > This is basically the 0.9 version counterpart for FLINK-3440. > When the 0.9 consumer fetches initial offsets from Kafka on startup, but does > not have any data to read, it should also checkpoint & commit these initial > offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2585: [FLINK-4727] [kafka-connector] Set missing initial offset...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2585 The failing tests are unrelated, and related tests are covered and have passed. Merging this ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4855) Add partitionedKeyBy to DataStream
Xiaowei Jiang created FLINK-4855: Summary: Add partitionedKeyBy to DataStream Key: FLINK-4855 URL: https://issues.apache.org/jira/browse/FLINK-4855 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Xiaowei Jiang Assignee: MaGuowei After we do any interesting operations (e.g. reduce) on KeyedStream, the result becomes DataStream. In a lot of cases, the output still has the same or compatible keys with the KeyedStream (logically). But to do further operations on these keys, we are forced to use keyby again. This works semantically, but is costly in two aspects. First, it destroys the possibility of chaining, which is one of the most important optimization technique. Second, keyby will greatly expand the connected components of tasks, which has implications in failover optimization. To address this shortcoming, we propose a new operator partitionedKeyBy. DataStream { public KeyedStream partitionedKeyBy(KeySelector key) } Semantically, DataStream.partitionedKeyBy(key) is equivalent to DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as an extra field. This guarantees that records from different tasks will never produce the same keys. With this, it's possible to do ds.keyBy(key1).reduce(func1) .partitionedKeyBy(key1).reduce(func2) .partitionedKeyBy(key2).reduce(func3); Most importantly, in certain cases, we will be able to chains these into a single vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4854) Efficient Batch Operator in Streaming
Xiaowei Jiang created FLINK-4854: Summary: Efficient Batch Operator in Streaming Key: FLINK-4854 URL: https://issues.apache.org/jira/browse/FLINK-4854 Project: Flink Issue Type: Improvement Reporter: Xiaowei Jiang Assignee: MaGuowei Very often, it's more efficient to process a batch of records at once instead of processing them one by one. We can use window to achieve this functionality. However, window will store all records in states, which can be costly. It's desirable to have an efficient implementation of batch operator. The batch operator works per task and behave similarly to aligned windows. Here is an example of how the interface looks like to a user. interface BatchFunction { // add the record to the buffer // returns if the batch is ready to be flushed boolean addRecord(T record); // process all pending records in the buffer void flush(Collector collector) ; } DataStream ds = ... BatchFunction func = ... ds.batch(func); The operator calls addRecord for each record. The batch function saves the record in its own buffer. The addRecord returns if the pending buffer should be flushed. In that case, the operator invokes flush. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586694#comment-15586694 ] ASF GitHub Bot commented on FLINK-3999: --- Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2642 @StephanEwen ,thanks for that, that makes sense. But it is hard for someone new to understand the problem you mention. If someone familiar with the product, could review new JIRAs for clear benefit / validity, that would encourage folks jumping on the tasks to know it better. I realize it may not be feasible but it saves time too to let them know in advance. I can certainly close the PR and make it Not a Problem. But if there is a better way to have this clearer , I would appreciate it before the effort is put in. > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...
Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2642 @StephanEwen ,thanks for that, that makes sense. But it is hard for someone new to understand the problem you mention. If someone familiar with the product, could review new JIRAs for clear benefit / validity, that would encourage folks jumping on the tasks to know it better. I realize it may not be feasible but it saves time too to let them know in advance. I can certainly close the PR and make it Not a Problem. But if there is a better way to have this clearer , I would appreciate it before the effort is put in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4850: --- Description: It seems that evaluate operation is defined for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML when using SVM. 1- We need to update the documentation as follows: val astroTest:DataSet[(Vector,Double)] = MLUtils .readLibSVM(env, "src/main/resources/svmguide1.t") .map(l => (l.vector, l.label)) val predictionPairs = svm.evaluate(astroTest) 2- Update code such that LabeledVector can be used with evaluate method was: It seems that SVM has predict operation for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML. We need to update the documentation as follows: val astroTest:DataSet[(Vector,Double)] = MLUtils .readLibSVM(env, "src/main/resources/svmguide1.t") .map(l => (l.vector, l.label)) val predictionPairs = svm.evaluate(astroTest) > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER > > It seems that evaluate operation is defined for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML when using SVM. > 1- We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) > 2- Update code such that LabeledVector can be used with evaluate method -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586580#comment-15586580 ] ASF GitHub Bot commented on FLINK-3999: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2642 I am a bit skeptical about these types of changes. Maintaining stability of Flink is crucially important now. With changes like these we risk introducing subtle bugs without fixing a bug or improving a functionality or anything. It just changes a naming preference. The kind of bugs one can introduce with changes like that are very subtle (we have seen that in cases where the cancelling logic was fragile depending on how the flag was exactly set) and hard to catch all in reviews. Not everything is fully covered by bullet proof tested in practice. I would actually suggest to rather contribute on issues where there is a clearer benefit. > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2642 I am a bit skeptical about these types of changes. Maintaining stability of Flink is crucially important now. With changes like these we risk introducing subtle bugs without fixing a bug or improving a functionality or anything. It just changes a naming preference. The kind of bugs one can introduce with changes like that are very subtle (we have seen that in cases where the cancelling logic was fragile depending on how the flag was exactly set) and hard to catch all in reviews. Not everything is fully covered by bullet proof tested in practice. I would actually suggest to rather contribute on issues where there is a clearer benefit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586564#comment-15586564 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 Resolved merge conflicts and squashed commits to rebase with master > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 Resolved merge conflicts and squashed commits to rebase with master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4846) FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM
[ https://issues.apache.org/jira/browse/FLINK-4846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER closed FLINK-4846. -- Resolution: Invalid > FlinkML - Pass "env" has an implicit parameter in MLUtils.readLibSVM > > > Key: FLINK-4846 > URL: https://issues.apache.org/jira/browse/FLINK-4846 > Project: Flink > Issue Type: Improvement >Reporter: Thomas FOURNIER >Priority: Minor > > With Flink ML you can import file via MLUtils.readLibSVM (import > org.apache.flink.ml.MLUtils) > For example: > val env = ExecutionEnvironment.getExecutionEnvironment > val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, > "src/main/resources/svmguide1") > I'd like to pass "env" as an implicit parameter and use the method as such: > val astroTrain: DataSet[LabeledVector] = > MLUtils.readLibSVM("src/main/resources/svmguide1") > Is it ok (not a scala specialist yet :) ) ? > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime
[ https://issues.apache.org/jira/browse/FLINK-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-4852: --- Assignee: Aljoscha Krettek > ClassCastException when assigning Watermarks with > TimeCharacteristic.ProcessingTime > --- > > Key: FLINK-4852 > URL: https://issues.apache.org/jira/browse/FLINK-4852 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels >Assignee: Aljoscha Krettek > Fix For: 1.2.0, 1.1.4 > > > As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, > when emitting Watermarks and using processing time, you get the following > ClassCastException: > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) > at > org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) > ... 11 more > Caused by: java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) > at > org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWa
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586082#comment-15586082 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Concerning the Kafka test: From the logs, the test fails because a topic cannot be deleted. The ZooKeeper operation blocks and test times out. I am pretty sure that this is unrelated, as no Flink is running at that point. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Concerning the Kafka test: From the logs, the test fails because a topic cannot be deleted. The ZooKeeper operation blocks and test times out. I am pretty sure that this is unrelated, as no Flink is running at that point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4445) Ignore unmatched state when restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-4445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15586067#comment-15586067 ] Stephan Ewen commented on FLINK-4445: - +1 for option (1) > Ignore unmatched state when restoring from savepoint > > > Key: FLINK-4445 > URL: https://issues.apache.org/jira/browse/FLINK-4445 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.1.1 >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > When currently submitting a job with a savepoint, we require that all state > is matched to the new job. Many users have noted that this is overly strict. > I would like to loosen this and allow savepoints to be restored without > matching all state. > The following options come to mind: > (1) Keep the current behaviour, but add a flag to allow ignoring state when > restoring, e.g. {{bin/flink -s --ignoreUnmatchedState}}. This > would be non-API breaking. > (2) Ignore unmatched state and continue. Additionally add a flag to be strict > about checking the state, e.g. {{bin/flink -s --strict}}. This > would be API-breaking as the default behaviour would change. Users might be > confused by this because there is no straight forward way to notice that > nothing has been restored. > I'm not sure what's the best thing here. [~gyfora], [~aljoscha] What do you > think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585948#comment-15585948 ] ASF GitHub Bot commented on FLINK-4715: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Travis gives the green light except for a Kafka failure. @StephanEwen @rmetzger Do you know whether this is a known issue? Or might it be a regression from this change? https://s3.amazonaws.com/archive.travis-ci.org/jobs/168613643/log.txt > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Travis gives the green light except for a Kafka failure. @StephanEwen @rmetzger Do you know whether this is a known issue? Or might it be a regression from this change? https://s3.amazonaws.com/archive.travis-ci.org/jobs/168613643/log.txt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2656: [FLINK-4852] Remove Non-Multiplexing StreamRecordS...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2656 [FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer This is getting rid of a source `ClassCastExceptions` that cropped up several times now. We simply always to the multiplexing now which adds the cost of transmitting one byte to every element we transmit. R: @mxm and @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink remove-stream-record-serializer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2656.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2656 commit 87fd6b6d824c03f2ff6591af8ae28a9b6e65296b Author: Aljoscha Krettek Date: 2016-10-18T16:32:17Z [FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer This also renames MultiplexingStreamRecordSerializer to StreamElementSerializer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime
[ https://issues.apache.org/jira/browse/FLINK-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585929#comment-15585929 ] ASF GitHub Bot commented on FLINK-4852: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2656 [FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer This is getting rid of a source `ClassCastExceptions` that cropped up several times now. We simply always to the multiplexing now which adds the cost of transmitting one byte to every element we transmit. R: @mxm and @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink remove-stream-record-serializer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2656.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2656 commit 87fd6b6d824c03f2ff6591af8ae28a9b6e65296b Author: Aljoscha Krettek Date: 2016-10-18T16:32:17Z [FLINK-4852] Remove Non-Multiplexing StreamRecordSerializer This also renames MultiplexingStreamRecordSerializer to StreamElementSerializer. > ClassCastException when assigning Watermarks with > TimeCharacteristic.ProcessingTime > --- > > Key: FLINK-4852 > URL: https://issues.apache.org/jira/browse/FLINK-4852 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0, 1.1.3 >Reporter: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, > when emitting Watermarks and using processing time, you get the following > ClassCastException: > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) > at > org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) > at > org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.jav
[GitHub] flink issue #2642: [FLINK-3999]: Rename the `running` flag in the drivers to...
Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2642 @ggevay they are not related.. I think it caught from me old branch on to this one. The 3 in the middle2ef4401,46d91b0,1433a5d are relevant here. Apologies for the mix up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
[ https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585754#comment-15585754 ] ASF GitHub Bot commented on FLINK-3999: --- Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2642 @ggevay they are not related.. I think it caught from me old branch on to this one. The 3 in the middle2ef4401,46d91b0,1433a5d are relevant here. Apologies for the mix up. > Rename the `running` flag in the drivers to `canceled` > -- > > Key: FLINK-3999 > URL: https://issues.apache.org/jira/browse/FLINK-3999 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Gabor Gevay >Assignee: Neelesh Srinivas Salian >Priority: Trivial > > The name of the {{running}} flag in the drivers doesn't reflect its usage: > when the operator just stops normally, then it is not running anymore, but > the {{running}} flag will still be true, since the {{running}} flag is only > set when cancelling. > It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585716#comment-15585716 ] Gábor Hermann commented on FLINK-1807: -- Thanks for your reply! Then, if I understand you correctly, this solution would not be proper because of the excessive use of memory. I believe sampling with dynamic path could have another significant overhead. If we used that approach, we would have to load a sample/minibatch of the data from another resource (disk/network) at every iteration step, and that might have a negative effect on performance. Of course if the sampling would not read the whole data from disk, but only the needed sample, then it would have a plausible performance. In another case, if sampling must read the whole data at every iteration, it could be arguably slow. (There's a third case, when we keep the data in memory, and the sampling does not have to do IO, but then we have similar memory usage as with my workaround.) As I see it, the two solutions (my suggested workaround and sampling with dynamic path) represent two sides of a memory-performance tradeoff: mine using too much memory, the other (possibly) being slow. Do I see it right? Do you think it's worth to choose the sampling approach here, because the performance overhead would be much lower? Or my workaround would be too "hacky", and it should not be burnt into the algorithm whether the sampling happens from memory or from disk? > Stochastic gradient descent optimizer for ML library > > > Key: FLINK-1807 > URL: https://issues.apache.org/jira/browse/FLINK-1807 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Theodore Vasiloudis > Labels: ML > > Stochastic gradient descent (SGD) is a widely used optimization technique in > different ML algorithms. Thus, it would be helpful to provide a generalized > SGD implementation which can be instantiated with the respective gradient > computation. Such a building block would make the development of future > algorithms easier. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4853) Clean up JobManager registration at the ResourceManager
Till Rohrmann created FLINK-4853: Summary: Clean up JobManager registration at the ResourceManager Key: FLINK-4853 URL: https://issues.apache.org/jira/browse/FLINK-4853 Project: Flink Issue Type: Sub-task Components: ResourceManager Reporter: Till Rohrmann Assignee: Till Rohrmann The current {{JobManager}} registration at the {{ResourceManager}} blocks threads in the {{RpcService.execute}} pool. This is not ideal and can be avoided by not waiting on a {{Future}} in this call. I propose to encapsulate the leader id retrieval operation in a distinct service so that it can be separated from the {{ResourceManager}}. This will reduce the complexity of the {{ResourceManager}} and make the individual components easier to test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin
[ https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585619#comment-15585619 ] Ted Yu commented on FLINK-4499: --- +1 on starting with a very small set of rules > Introduce findbugs maven plugin > --- > > Key: FLINK-4499 > URL: https://issues.apache.org/jira/browse/FLINK-4499 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > As suggested by Stephan in FLINK-4482, this issue is to add > findbugs-maven-plugin into the build process so that we can detect lack of > proper locking and other defects automatically. > We can begin with small set of rules. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4852) ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime
Maximilian Michels created FLINK-4852: - Summary: ClassCastException when assigning Watermarks with TimeCharacteristic.ProcessingTime Key: FLINK-4852 URL: https://issues.apache.org/jira/browse/FLINK-4852 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.2.0, 1.1.3 Reporter: Maximilian Michels Fix For: 1.2.0, 1.1.4 As per FLINK-3688 and FLINK-2936 this should already been resolved. Still, when emitting Watermarks and using processing time, you get the following ClassCastException: {noformat} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161) at org.apache.flink.streaming.api.functions.source.StatefulSequenceSource.run(StatefulSequenceSource.java:68) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:343) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) at org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement(TimestampsAndPunctuatedWatermarksOperator.java:58) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 11 more Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:340) at org.apache.flink.streaming.api.operators.StreamMap.processWatermark(StreamMap.java:44) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:340) ... 14 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) at
[jira] [Closed] (FLINK-4512) Add option for persistent checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-4512. -- Resolution: Fixed Fix Version/s: 1.2.0 Implemented in fd410d9 (master). > Add option for persistent checkpoints > - > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4507) Deprecate savepoint backend config
[ https://issues.apache.org/jira/browse/FLINK-4507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-4507. -- Resolution: Fixed Fix Version/s: 1.2.0 Fixed in fd410d9 (master). > Deprecate savepoint backend config > -- > > Key: FLINK-4507 > URL: https://issues.apache.org/jira/browse/FLINK-4507 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > The savepoint backend configuration allows both {{jobmanager}} and > {{filesystem}} as values. The {{jobmanager}} variant is used as default if > nothing is configured. > As part of FLIP-10, we want to get rid of this distinction and make all > savepoints go to a file. Savepoints backed by JobManagers are only relevant > for testing. Users could only recover from them if they did not shut down the > current cluster. > Deprecate the {{savepoints.state.backend.fs.dir}} and add > {{state.savepoints.dir}} as new config key. This is used as the default > savepoint directory. Users can overwrite this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4509) Specify savepoint directory per savepoint
[ https://issues.apache.org/jira/browse/FLINK-4509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-4509. -- Resolution: Fixed Fix Version/s: 1.2.0 Fixed in fd410d9 (master). > Specify savepoint directory per savepoint > - > > Key: FLINK-4509 > URL: https://issues.apache.org/jira/browse/FLINK-4509 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > Currently, savepoints go to a per cluster configured default directory > (configured via {{savepoints.state.backend}} and > {{savepoints.state.backend.fs.dir}}). > We shall allow to specify the directory per triggered savepoint in case no > default is configured. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2655: [FLINK-4851] [rm] Introduce FatalErrorHandler and ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2655 [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR is based on #2651 This PR introduces a `FatalErrorHandler` and the `MetricRegistry` to the RM. The `FatalErrorHandler` is used to handle fatal errors. Additionally, the PR adds the `MetricRegistry` to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the `TestingFatalErrorHandler` into the util package of flink-runtime test. That way, it is usable across multiple tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink resourceManagerImprovements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2655 commit ddf35c4ddb04629cddebb2401488effe93416b70 Author: Till Rohrmann Date: 2016-10-17T14:22:16Z [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. commit 9fdcc11f7730b3c7a6f061c17d0463ea3f21a9f9 Author: Till Rohrmann Date: 2016-10-17T14:03:02Z [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds the MetricRegistry to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the TestingFatalErrorHandler into the util package of flink-runtime test. That it is usable across multiple tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4850: --- Description: It seems that SVM has predict operation for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML. We need to update the documentation as follows: val astroTest:DataSet[(Vector,Double)] = MLUtils .readLibSVM(env, "src/main/resources/svmguide1.t") .map(l => (l.vector, l.label)) val predictionPairs = svm.evaluate(astroTest) was: It seems that SVM has predict operation for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML. Update > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER > > It seems that SVM has predict operation for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML. > We need to update the documentation as follows: > val astroTest:DataSet[(Vector,Double)] = MLUtils > .readLibSVM(env, "src/main/resources/svmguide1.t") > .map(l => (l.vector, l.label)) > val predictionPairs = svm.evaluate(astroTest) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4850: --- Description: It seems that SVM has predict operation for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML. Update was: It seems that SVM has predict operation for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML. > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER > > It seems that SVM has predict operation for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML. > Update -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83854971 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java --- @@ -15,38 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.streaming.api.operators; -package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; - -import java.util.concurrent.ScheduledFuture; - -class NoOpTimerService extends TimeServiceProvider { - - private volatile boolean terminated; - - @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); - } - - @Override - public ScheduledFuture registerTimer(long timestamp, Triggerable target) { - return null; - } - - @Override - public boolean isTerminated() { - return terminated; - } - - @Override - public void quiesceAndAwaitPending() {} - - @Override - public void shutdownService() { - terminated = true; - } +/** + * Interface for things that can be called by {@link InternalTimerService}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface Triggerable { --- End diff -- I found this name a bit confusing, it could lead to confusion with the concept of Triggers. Furthermore, there is another class with the same simple name in a different package. Maybe this could be called `TimerCallback`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585571#comment-15585571 ] ASF GitHub Bot commented on FLINK-3322: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2510 Sorry, I'm extremely busy these days. I'm not sure when will I have time, unfortunately. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...
Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2510 Sorry, I'm extremely busy these days. I'm not sure when will I have time, unfortunately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83862277 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = + (HeapInternalTimerService.RestoredTimers) restoredServices.remove(name); + + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService(), + restoredService); + + } else { + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerServices.put(name, service); + } + + return service; + } + + public void processWatermark(Watermark mark) throws Exception { + for (HeapInternalTimerService service : timerServices.values()) { + service.advanceWatermark(mark.getTimestamp()); + } + output.emitWatermark(mark); + } + + public void processWatermark1(Watermark mark) throws Exception { + input1Watermark = mark.getTimestamp(); + long newMin = Math.min(input1Watermark, input2Watermark); + if (newMin > combinedWatermark) { + combinedWatermark = newMin; + processWatermark(new Watermark(combinedWatermark)); + } + } + + public void processWatermark2(Watermark mark) throws Exception { --- End diff -- As a general comment, somehow I don't like how two cases (one and two imputs) are hardcoded here.
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585561#comment-15585561 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83860194 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, Triggerable { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingT
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83861268 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = --- End diff -- `contains()` + `remove()` seems a bit redundant for this use. I would just always remove and check the return value for null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585556#comment-15585556 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83856028 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6));
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585554#comment-15585554 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83854971 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java --- @@ -15,38 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.streaming.api.operators; -package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; - -import java.util.concurrent.ScheduledFuture; - -class NoOpTimerService extends TimeServiceProvider { - - private volatile boolean terminated; - - @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); - } - - @Override - public ScheduledFuture registerTimer(long timestamp, Triggerable target) { - return null; - } - - @Override - public boolean isTerminated() { - return terminated; - } - - @Override - public void quiesceAndAwaitPending() {} - - @Override - public void shutdownService() { - terminated = true; - } +/** + * Interface for things that can be called by {@link InternalTimerService}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface Triggerable { --- End diff -- I found this name a bit confusing, it could lead to confusion with the concept of Triggers. Furthermore, there is another class with the same simple name in a different package. Maybe this could be called `TimerCallback`? > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585564#comment-15585564 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83858268 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * Internal class for keeping track of in-flight timers. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public class InternalTimer implements Comparable> { + private final long timestamp; + private final K key; + private final N namespace; + + public InternalTimer(long timestamp, K key, N namespace) { + this.timestamp = timestamp; + this.key = key; + this.namespace = namespace; + } + + public long getTimestamp() { + return timestamp; + } + + public K getKey() { + return key; + } + + public N getNamespace() { + return namespace; + } + + @Override + public int compareTo(InternalTimer o) { + return Long.compare(this.timestamp, o.timestamp); --- End diff -- Method compareTo is not fully aligned with equals, which is acceptable but strongly advised against by the documentation of `Comparable`. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83858268 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * Internal class for keeping track of in-flight timers. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public class InternalTimer implements Comparable> { + private final long timestamp; + private final K key; + private final N namespace; + + public InternalTimer(long timestamp, K key, N namespace) { + this.timestamp = timestamp; + this.key = key; + this.namespace = namespace; + } + + public long getTimestamp() { + return timestamp; + } + + public K getKey() { + return key; + } + + public N getNamespace() { + return namespace; + } + + @Override + public int compareTo(InternalTimer o) { + return Long.compare(this.timestamp, o.timestamp); --- End diff -- Method compareTo is not fully aligned with equals, which is acceptable but strongly advised against by the documentation of `Comparable`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585557#comment-15585557 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83861268 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = --- End diff -- `contains()` + `remove()` seems a bit redundant for this use. I would just always remove and check the return value for null. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83857663 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. + * + * A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react + * to them firing. + * + * {@code + * DataStream input = ...; + * + * DataStream result = input.flatMap(new MyTimelyFlatMapFunction()); + * } + * + * @param Type of the input elements. + * @param Type of the returned elements. + */ +@PublicEvolving +public interface TimelyFlatMapFunction extends Function, Serializable { + + /** +* The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms +* it into zero, one, or more elements. +* +* @param value The input value. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void flatMap(I value, TimerService timerService, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param timeDomain The {@link TimeDomain} of the firing timer. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector out) throws Exception ; --- End diff -- I wonder if `TimeDomain` and `TimerService` should be parameter to methods in this interface. I assume both remain stable for the lifetime of the UDF and could be passed once in some init method that can also be preimplemented in a `RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I like to keep the number of parameters small when possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585562#comment-15585562 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83858860 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * Interface for working with time and timers. + * + * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} + * that allows to specify a key and a namespace to which timers should be scoped. + * + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface InternalTimerService { + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event time. */ + long currentWatermark(); --- End diff -- The corresponding method in the public interface is called `currentEventTime`. Does it makes sense to keep both method names synchronous? > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585559#comment-15585559 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83857663 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +/** + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them, + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists + * and arrays. + * + * A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react + * to them firing. + * + * {@code + * DataStream input = ...; + * + * DataStream result = input.flatMap(new MyTimelyFlatMapFunction()); + * } + * + * @param Type of the input elements. + * @param Type of the returned elements. + */ +@PublicEvolving +public interface TimelyFlatMapFunction extends Function, Serializable { + + /** +* The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms +* it into zero, one, or more elements. +* +* @param value The input value. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void flatMap(I value, TimerService timerService, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param timeDomain The {@link TimeDomain} of the firing timer. +* @param timerService A {@link TimerService} that allows setting timers and querying the +*current time. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector out) throws Exception ; --- End diff -- I wonder if `TimeDomain` and `TimerService` should be parameter to methods in this interface. I assume both remain stable for the lifetime of the UDF and could be passed once in some init method that can also be preimplemented in a `RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I like to keep the number of parameters small when possible. > Add an interface for Time aware User Functions > -- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs ca
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83860194 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, Triggerable { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = restoredTimers.watermarkTimers; + watermarkTimersQueue = restoredTimers.watermarkTimersQueue; + + processingTimeTimers = restoredTimers.processingTimeTimers; +
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585558#comment-15585558 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83855317 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6));
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83856028 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); + expectedOutput.add(new StreamRecord<>("6PT:42")); + + TestHa
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83859446 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, Triggerable { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + --- End diff -- RestoredTimers are serialized with their typeserializers. It could make sense to have some equals or compatibility check here against the passed type serializers. Also wonder if they need to be serialized i
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585563#comment-15585563 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83860732 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java --- @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.contains; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link HeapInternalTimerService}. + */ +public class HeapInternalTimerServiceTest { + + private static InternalTimer anyInternalTimer() { + return any(); + } + + /** +* Verify that we only ever have one processing-time task registered at the +* {@link ProcessingTimeService}. +*/ + @Test + public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception { + @SuppressWarnings("unchecked") + Triggerable mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + HeapInternalTimerService timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("ciao", 20); + timerService.registerProcessingTimeTimer("ciao", 30); + timerService.registerProcessingTimeTimer("hello", 10); + timerService.registerProcessingTimeTimer("hello", 20); + + assertEquals(5, timerService.numProcessingTimeTimers()); + assertEquals(2, timerService.numProcessingTimeTimers("hello")); + assertEquals(3, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + + processingTimeService.setCurrentTime(10); + + assertEquals(3, timerService.numProcessingTimeTimers()); + assertEquals(1, timerService.numProcessingTimeTimers("hello")); + assertEquals(2, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + + processingTimeService.setCurrentTime(20); + + assertEquals(1, timerService.numProcessingTimeTimer
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1558#comment-1558 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83859446 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link InternalTimerService} that stores timers on the Java heap. + */ +public class HeapInternalTimerService implements InternalTimerService, Triggerable { + + private final TypeSerializer keySerializer; + + private final TypeSerializer namespaceSerializer; + + private final ProcessingTimeService processingTimeService; + + private long currentWatermark = Long.MIN_VALUE; + + private final org.apache.flink.streaming.api.operators.Triggerable triggerTarget; + + private final KeyContext keyContext; + + /** +* Processing time timers that are currently in-flight. +*/ + private final PriorityQueue> processingTimeTimersQueue; + private final Set> processingTimeTimers; + + protected ScheduledFuture nextTimer = null; + + /** +* Currently waiting watermark callbacks. +*/ + private final Set> watermarkTimers; + private final PriorityQueue> watermarkTimersQueue; + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingTimeService); + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + public HeapInternalTimerService( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + org.apache.flink.streaming.api.operators.Triggerable triggerTarget, + KeyContext keyContext, + ProcessingTimeService processingTimeService, + RestoredTimers restoredTimers) { + + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); + this.triggerTarget = checkNotNull(triggerTarget); + this.keyContext = keyContext; + this.processingTimeService = checkNotNull(processingT
[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions
[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585560#comment-15585560 ] ASF GitHub Bot commented on FLINK-3674: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83862277 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -390,4 +425,141 @@ public void close() { output.close(); } } + + // + // Watermark handling + // + + /** +* Returns a {@link InternalTimerService} that can be used to query current processing time +* and event time and to set timers. An operator can have several timer services, where +* each has its own namespace serializer. Timer services are differentiated by the string +* key that is given when requesting them, if you call this method with the same key +* multiple times you will get the same timer service instance in subsequent requests. +* +* Timers are always scoped to a key, the currently active key of a keyed stream operation. +* When a timer fires, this key will also be set as the currently active key. +* +* Each timer has attached metadata, the namespace. Different timer services +* can have a different namespace type. If you don't need namespace differentiation you +* can use {@link VoidNamespaceSerializer} as the namespace serializer. +* +* @param name The name of the requested timer service. If no service exists under the given +* name a new one will be created and returned. +* @param keySerializer {@code TypeSerializer} for the keys of the timers. +* @param namespaceSerializer {@code TypeSerializer} for the timer namespace. +* @param triggerable The {@link Triggerable} that should be invoked when timers fire +* +* @param The type of the timer keys. +* @param The type of the timer namespace. +*/ + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + + @SuppressWarnings("unchecked") + HeapInternalTimerService service = (HeapInternalTimerService) timerServices.get(name); + + if (service == null) { + if (restoredServices != null && restoredServices.containsKey(name)) { + @SuppressWarnings("unchecked") + HeapInternalTimerService.RestoredTimers restoredService = + (HeapInternalTimerService.RestoredTimers) restoredServices.remove(name); + + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService(), + restoredService); + + } else { + service = new HeapInternalTimerService<>( + keySerializer, + namespaceSerializer, + triggerable, + this, + getRuntimeContext().getProcessingTimeService()); + } + timerServices.put(name, service); + } + + return service; + } + + public void processWatermark(Watermark mark) throws Exception { + for (HeapInternalTimerService service : timerServices.values()) { + service.advanceWatermark(mark.getTimestamp()); + } + output.emitWatermark(mark); + } + + public void processWatermark1(Watermark mark) throws Exception { + input1Watermark = mark.getTimestamp(); + long newMin = Math.min(input1Watermark, input2Watermark); + if (newMin > combinedWatermark) { + combinedWatermark = newMin; + processWatermark(new Watermark(combin
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83858860 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +/** + * Interface for working with time and timers. + * + * This is the internal version of {@link org.apache.flink.streaming.api.TimerService} + * that allows to specify a key and a namespace to which timers should be scoped. + * + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public interface InternalTimerService { + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event time. */ + long currentWatermark(); --- End diff -- The corresponding method in the public interface is called `currentEventTime`. Does it makes sense to keep both method names synchronous? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83860732 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java --- @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.contains; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link HeapInternalTimerService}. + */ +public class HeapInternalTimerServiceTest { + + private static InternalTimer anyInternalTimer() { + return any(); + } + + /** +* Verify that we only ever have one processing-time task registered at the +* {@link ProcessingTimeService}. +*/ + @Test + public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception { + @SuppressWarnings("unchecked") + Triggerable mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + HeapInternalTimerService timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("ciao", 20); + timerService.registerProcessingTimeTimer("ciao", 30); + timerService.registerProcessingTimeTimer("hello", 10); + timerService.registerProcessingTimeTimer("hello", 20); + + assertEquals(5, timerService.numProcessingTimeTimers()); + assertEquals(2, timerService.numProcessingTimeTimers("hello")); + assertEquals(3, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + + processingTimeService.setCurrentTime(10); + + assertEquals(3, timerService.numProcessingTimeTimers()); + assertEquals(1, timerService.numProcessingTimeTimers("hello")); + assertEquals(2, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + + processingTimeService.setCurrentTime(20); + + assertEquals(1, timerService.numProcessingTimeTimers()); + assertEquals(0, timerService.numProcessingTimeTimers("hello")); + assertEquals(1, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers());
[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r83855317 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); + expectedOutput.add(new StreamRecord<>("6PT:42")); + + TestHa
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585514#comment-15585514 ] ASF GitHub Bot commented on FLINK-3322: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2510 @ggevay Do you have some time to check the last commit in this PR? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Assignee: ramkrishna.s.vasudevan >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585538#comment-15585538 ] ASF GitHub Bot commented on FLINK-4851: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2655 [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR is based on #2651 This PR introduces a `FatalErrorHandler` and the `MetricRegistry` to the RM. The `FatalErrorHandler` is used to handle fatal errors. Additionally, the PR adds the `MetricRegistry` to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the `TestingFatalErrorHandler` into the util package of flink-runtime test. That way, it is usable across multiple tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink resourceManagerImprovements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2655 commit ddf35c4ddb04629cddebb2401488effe93416b70 Author: Till Rohrmann Date: 2016-10-17T14:22:16Z [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. commit 9fdcc11f7730b3c7a6f061c17d0463ea3f21a9f9 Author: Till Rohrmann Date: 2016-10-17T14:03:02Z [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds the MetricRegistry to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the TestingFatalErrorHandler into the util package of flink-runtime test. That it is usable across multiple tests. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585534#comment-15585534 ] Maximilian Michels commented on FLINK-4829: --- To mitigate the problem, we have best-effort reporting for accumulators now. master: 783dca56eedc95f0a8974a9b50f2b532ca8cf849, d95929e0110b53f03452e1ad453de2522f79a6b8 release-1.1: c1d6b24600e40700fa06caa28bc81788d8e92386, 210230c4ab44b84c28b9a62ff461de0955e67f8f > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2649: [FLINK-4829] snapshot accumulators on a best-effor...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2649 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4829) Accumulators are not thread safe
[ https://issues.apache.org/jira/browse/FLINK-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585520#comment-15585520 ] ASF GitHub Bot commented on FLINK-4829: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2649 > Accumulators are not thread safe > > > Key: FLINK-4829 > URL: https://issues.apache.org/jira/browse/FLINK-4829 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > Flink's {{Accumulators}} are not thread safe. With the introduction of live > accumulator snapshots which are sent to the {{JobManager}}, we've introduced > a concurrent access to accumulators without properly guard them against > concurrent modifications. So if an accumulator snapshot is taken for an > accumulator which is at the same time modified, it can cause an > {{ConcurrentModificationException}} as it was reported by an user: > {code} > WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed > to serialize accumulators for task. > java.util.ConcurrentModificationException > at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) > at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) > at java.util.TreeMap.writeObject(TreeMap.java:2436) > at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.HashMap.internalWriteEntries(HashMap.java:1785) > at java.util.HashMap.writeObject(HashMap.java:1362) > at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2510: FLINK-3322 Allow drivers and iterators to reuse the memor...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2510 @ggevay Do you have some time to check the last commit in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
Till Rohrmann created FLINK-4851: Summary: Add FatalErrorHandler and MetricRegistry to ResourceManager Key: FLINK-4851 URL: https://issues.apache.org/jira/browse/FLINK-4851 Project: Flink Issue Type: Sub-task Components: ResourceManager Reporter: Till Rohrmann Assignee: Till Rohrmann The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. In order to harmonize the fatal error handling across all components, we should introduce a {{FatalErrorHandler}}, which handles fatal errors. Additionally, we should also give a {{MetricRegistry}} to the {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2118) Table API fails on composite filter conditions
[ https://issues.apache.org/jira/browse/FLINK-2118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Shoshin reassigned FLINK-2118: Assignee: Alexander Shoshin > Table API fails on composite filter conditions > -- > > Key: FLINK-2118 > URL: https://issues.apache.org/jira/browse/FLINK-2118 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Alexander Shoshin > > Having a composite filter conditions such as > {code} > myTable.filter('name !== "Pete" && 'name !== "Bob") > {code} > fails with the following error message: > {code} > ExpressionException: Non-boolean operand types String and String in Pete && > 'name > {code} > whereas > {code} > myTable.filter( ('name !== "Pete") && ('name !== "Bob") ) > {code} > works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585425#comment-15585425 ] ASF GitHub Bot commented on FLINK-4715: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Haha, that picture convinced me to actually add the test :smile: > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Haha, that picture convinced me to actually add the test :smile: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Yeah, this sort of covers it. Just afraid of such a situation here: https://twitter.com/thepracticaldev/status/687672086152753152 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585403#comment-15585403 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Yeah, this sort of covers it. Just afraid of such a situation here: https://twitter.com/thepracticaldev/status/687672086152753152 > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585380#comment-15585380 ] ASF GitHub Bot commented on FLINK-4715: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 We have the `TaskManagerProcessReapingTest` which tests that the TaskManager process properly exits when the TaskManager actor dies. In addition, there is `TaskManagerTest#testTerminationOnFatalError`, which tests that the `FatalError` message terminates the actor. Do you think we are already covered by this? We can certainly add a process reaper test variant that sends a `FatalError` message instead of the `PoisonPill`. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 We have the `TaskManagerProcessReapingTest` which tests that the TaskManager process properly exits when the TaskManager actor dies. In addition, there is `TaskManagerTest#testTerminationOnFatalError`, which tests that the `FatalError` message terminates the actor. Do you think we are already covered by this? We can certainly add a process reaper test variant that sends a `FatalError` message instead of the `PoisonPill`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585352#comment-15585352 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 What do you think about a followup test, where we ensure that a fatal error notification on the TaskManager actually results in a process kill? > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 What do you think about a followup test, where we ensure that a fatal error notification on the TaskManager actually results in a process kill? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585346#comment-15585346 ] ASF GitHub Bot commented on FLINK-4715: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Renamed the `TaskOptions` class to `TaskManagerOptions` so that we can easily migrate the task manager options as a follow up. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Renamed the `TaskOptions` class to `TaskManagerOptions` so that we can easily migrate the task manager options as a follow up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585344#comment-15585344 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83842005 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java --- @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** +* Tests that interrupt happens via watch dog if canceller is stuck in cancel. +* Task cancellation blocks the task canceller. Interrupt after cancel via +* cancellation watch dog. +*/ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** +* The invoke() method holds a lock (trigger awaitLatch after acquisition) +* and cancel cannot complete because it also tries to acquire the same lock. +* This is resolved by the watch dog, no fatal error. +*/ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); --- End diff -- You can also give a message to `assertFalse` - I like assertEquals for printing the expected value, but if the expected value is false, the former seems more natural to me... > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1807) Stochastic gradient descent optimizer for ML library
[ https://issues.apache.org/jira/browse/FLINK-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585342#comment-15585342 ] Till Rohrmann commented on FLINK-1807: -- This could work as a workaround but I think it's not the proper solution to the problem. Instead we should try to fix the static path problem. I.e. we could introduce an operator flag which allows to mark an operator as being dynamic. When determining the static path not only the partial solution dataset but also the sampling operator which actually lies on a static path, should be marked as dynamic. Then for each iteration, we have to start executing all dynamic operators. > Stochastic gradient descent optimizer for ML library > > > Key: FLINK-1807 > URL: https://issues.apache.org/jira/browse/FLINK-1807 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Theodore Vasiloudis > Labels: ML > > Stochastic gradient descent (SGD) is a widely used optimization technique in > different ML algorithms. Thus, it would be helpful to provide a generalized > SGD implementation which can be instantiated with the respective gradient > computation. Such a building block would make the development of future > algorithms easier. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83842005 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java --- @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** +* Tests that interrupt happens via watch dog if canceller is stuck in cancel. +* Task cancellation blocks the task canceller. Interrupt after cancel via +* cancellation watch dog. +*/ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** +* The invoke() method holds a lock (trigger awaitLatch after acquisition) +* and cancel cannot complete because it also tries to acquire the same lock. +* This is resolved by the watch dog, no fatal error. +*/ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); --- End diff -- You can also give a message to `assertFalse` - I like assertEquals for printing the expected value, but if the expected value is false, the former seems more natural to me... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585332#comment-15585332 ] ASF GitHub Bot commented on FLINK-4715: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83841451 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1251,33 +1299,124 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } + + /** +* Watchdog for the cancellation. If the task is stuck in cancellation, +* we notify the task manager about a fatal error. +*/ + private static class TaskCancellationWatchDog extends TimerTask { + + /** +* Pass logger in order to prevent that the compiler needs to inject static bridge methods +* to access it. +*/ + private final Logger logger; + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Logger logger, + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + taskCancellerLatch.await(); + } catch (Exception e) { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + } + + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. + while (executor.isAlive()) { + long now = System.nanoTime(); + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); + S
[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83841451 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1251,33 +1299,124 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } + + /** +* Watchdog for the cancellation. If the task is stuck in cancellation, +* we notify the task manager about a fatal error. +*/ + private static class TaskCancellationWatchDog extends TimerTask { + + /** +* Pass logger in order to prevent that the compiler needs to inject static bridge methods +* to access it. +*/ + private final Logger logger; + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Logger logger, + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + taskCancellerLatch.await(); + } catch (Exception e) { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + } + + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. + while (executor.isAlive()) { + long now = System.nanoTime(); + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); + StackTraceElement[] stack = executor.getStackTrace(); + for (StackTraceElement e : stack) { + bld.append(e).append('\n'); + } -
[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83840872 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java --- @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** +* Tests that interrupt happens via watch dog if canceller is stuck in cancel. +* Task cancellation blocks the task canceller. Interrupt after cancel via +* cancellation watch dog. +*/ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** +* The invoke() method holds a lock (trigger awaitLatch after acquisition) +* and cancel cannot complete because it also tries to acquire the same lock. +* This is resolved by the watch dog, no fatal error. +*/ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); --- End diff -- `assertFalse` only fails with printing `AssertionError` and you have to check the stack trace whereas `assertEquals` has a message printing expected and actual inputs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585325#comment-15585325 ] ASF GitHub Bot commented on FLINK-4715: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83840872 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java --- @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** +* Tests that interrupt happens via watch dog if canceller is stuck in cancel. +* Task cancellation blocks the task canceller. Interrupt after cancel via +* cancellation watch dog. +*/ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** +* The invoke() method holds a lock (trigger awaitLatch after acquisition) +* and cancel cannot complete because it also tries to acquire the same lock. +* This is resolved by the watch dog, no fatal error. +*/ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); --- End diff -- `assertFalse` only fails with printing `AssertionError` and you have to check the stack trace whereas `assertEquals` has a message printing expected and actual inputs. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585320#comment-15585320 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83840545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java --- @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** +* Tests that interrupt happens via watch dog if canceller is stuck in cancel. +* Task cancellation blocks the task canceller. Interrupt after cancel via +* cancellation watch dog. +*/ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** +* The invoke() method holds a lock (trigger awaitLatch after acquisition) +* and cancel cannot complete because it also tries to acquire the same lock. +* This is resolved by the watch dog, no fatal error. +*/ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); --- End diff -- how about adding `task.getExecutingThread().join()` instead of the using the trigger latch? Seems more intuitive and safer. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83840545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java --- @@ -565,6 +568,128 @@ public void testOnPartitionStateUpdate() throws Exception { verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId())); } + /** +* Tests that interrupt happens via watch dog if canceller is stuck in cancel. +* Task cancellation blocks the task canceller. Interrupt after cancel via +* cancellation watch dog. +*/ + @Test + public void testWatchDogInterruptsTask() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableBlockingInCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); + + // No fatal error + for (Object msg : taskManagerMessages) { + assertEquals(false, msg instanceof TaskManagerMessages.FatalError); + } + } + + /** +* The invoke() method holds a lock (trigger awaitLatch after acquisition) +* and cancel cannot complete because it also tries to acquire the same lock. +* This is resolved by the watch dog, no fatal error. +*/ + @Test + public void testInterruptableSharedLockInInvokeAndCancel() throws Exception { + Configuration config = new Configuration(); + config.setLong(TaskOptions.CANCELLATION_INTERVAL.key(), 5); + config.setLong(TaskOptions.CANCELLATION_TIMEOUT.key(), 50); + + Task task = createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config); + task.startTaskThread(); + + awaitLatch.await(); + + task.cancelExecution(); + + triggerLatch.await(); --- End diff -- how about adding `task.getExecutingThread().join()` instead of the using the trigger latch? Seems more intuitive and safer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585309#comment-15585309 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83839720 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1251,33 +1299,124 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } + + /** +* Watchdog for the cancellation. If the task is stuck in cancellation, +* we notify the task manager about a fatal error. +*/ + private static class TaskCancellationWatchDog extends TimerTask { + + /** +* Pass logger in order to prevent that the compiler needs to inject static bridge methods +* to access it. +*/ + private final Logger logger; + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Logger logger, + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + taskCancellerLatch.await(); + } catch (Exception e) { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + } + + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. + while (executor.isAlive()) { + long now = System.nanoTime(); + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); +
[GitHub] flink pull request #2652: [FLINK-4715] Fail TaskManager with fatal error if ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2652#discussion_r83839720 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1251,33 +1299,124 @@ public void run() { catch (InterruptedException e) { // we can ignore this } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } + + /** +* Watchdog for the cancellation. If the task is stuck in cancellation, +* we notify the task manager about a fatal error. +*/ + private static class TaskCancellationWatchDog extends TimerTask { + + /** +* Pass logger in order to prevent that the compiler needs to inject static bridge methods +* to access it. +*/ + private final Logger logger; + + /** Thread executing the Task. */ + private final Thread executor; + + /** Interrupt interval. */ + private final long interruptInterval; + + /** Timeout after which a fatal error notification happens. */ + private final long interruptTimeout; + + /** TaskManager to notify about a timeout */ + private final TaskManagerConnection taskManager; + + /** Task name (for logging and error messages). */ + private final String taskName; + + /** Synchronization with the {@link TaskCanceler} thread. */ + private final CountDownLatch taskCancellerLatch; + + public TaskCancellationWatchDog( + Logger logger, + Thread executor, + long interruptInterval, + long interruptTimeout, + TaskManagerConnection taskManager, + String taskName, + CountDownLatch taskCancellerLatch) { + + this.logger = checkNotNull(logger); + this.executor = checkNotNull(executor); + this.interruptInterval = checkNotNull(interruptInterval); + this.interruptTimeout = checkNotNull(interruptTimeout); + this.taskManager = checkNotNull(taskManager); + this.taskName = checkNotNull(taskName); + this.taskCancellerLatch = checkNotNull(taskCancellerLatch); + } + + @Override + public void run() { + try { + // Synchronize with task canceler + taskCancellerLatch.await(); + } catch (Exception e) { + String msg = String.format("Exception while waiting on task " + + "canceller to cancel task '%s'.", taskName); + taskManager.notifyFatalError(msg, e); + return; + } + + long intervalNanos = TimeUnit.NANOSECONDS.convert(interruptInterval, TimeUnit.MILLISECONDS); + long timeoutNanos = TimeUnit.NANOSECONDS.convert(interruptTimeout, TimeUnit.MILLISECONDS); + long deadline = System.nanoTime() + timeoutNanos; + + try { + // Initial wait before interrupting periodically + Thread.sleep(interruptInterval); + } catch (InterruptedException ignored) { + } + + // It is possible that the user code does not react to the task canceller. + // for that reason, we spawn this separate thread that repeatedly interrupts + // the user code until it exits. If the suer user code does not exit within + // the timeout, we notify the job manager about a fatal error. + while (executor.isAlive()) { + long now = System.nanoTime(); + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); + StackTraceElement[] stack = executor.getStackTrace(); + for (StackTraceElement e : stack) { + bld.append(e).append('\n'); + } -
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585283#comment-15585283 ] ASF GitHub Bot commented on FLINK-4715: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Should we have one class `TaskManagerOptions`? To not spread the config over too many classes. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2652 Should we have one class `TaskManagerOptions`? To not spread the config over too many classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4715) TaskManager should commit suicide after cancellation failure
[ https://issues.apache.org/jira/browse/FLINK-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585275#comment-15585275 ] ASF GitHub Bot commented on FLINK-4715: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Thanks for the valuable feedback. Some of the errors were a little sloppy on my side. Sorry for that. I addressed all your comments. > TaskManager should commit suicide after cancellation failure > > > Key: FLINK-4715 > URL: https://issues.apache.org/jira/browse/FLINK-4715 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > Fix For: 1.2.0 > > > In case of a failed cancellation, e.g. the task cannot be cancelled after a > given time, the {{TaskManager}} should kill itself. That way we guarantee > that there is no resource leak. > This behaviour acts as a safety-net against faulty user code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2652: [FLINK-4715] Fail TaskManager with fatal error if task ca...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2652 Thanks for the valuable feedback. Some of the errors were a little sloppy on my side. Sorry for that. I addressed all your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector
[ https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas FOURNIER updated FLINK-4850: --- Description: It seems that SVM has predict operation for Vector and not LabeledVector. It impacts QuickStart guide for FlinkML. was: It seems that SVM has predict operation for Vector and not LabeledVector. It impacts QuickStart guide for FlinkMl. > FlinkML - SVM predict Operation for Vector and not LaveledVector > > > Key: FLINK-4850 > URL: https://issues.apache.org/jira/browse/FLINK-4850 > Project: Flink > Issue Type: Bug >Reporter: Thomas FOURNIER > > It seems that SVM has predict operation for Vector and not LabeledVector. > It impacts QuickStart guide for FlinkML. -- This message was sent by Atlassian JIRA (v6.3.4#6332)