[jira] [Commented] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347577#comment-15347577 ] Geoffrey Mon commented on FLINK-4098: - Thanks for the {{sys.stdout.flush()}} tip. After adding more debug statements, I found out that the real issue is in the Iterator, when next() is called, the function starts but hangs waiting for data to be sent so that it can be read. > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4109) Change the name of ternary condition operator 'eval' to '?'
[ https://issues.apache.org/jira/browse/FLINK-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347564#comment-15347564 ] Jark Wu commented on FLINK-4109: [~vkalavri] [~chengxiang li] [~rmetzger] what do you think ? > Change the name of ternary condition operator 'eval' to '?' > --- > > Key: FLINK-4109 > URL: https://issues.apache.org/jira/browse/FLINK-4109 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.1.0 >Reporter: Jark Wu >Assignee: Jark Wu > Fix For: 1.1.0 > > > The ternary condition operator in Table API is named {{eval}}, for example: > {{(42 > 5).eval("A", "B")}} leads to "A". IMO, the eval function is not well > understood. Instead the "?" is a better choice I think, which is used in Java > for condition operator. > It will be clearer and more literal understood, e.g. > {{(42 > 5).?("A", "B")}} or {{(42 > 5) ? ("A", "B")}} > If it make sense, I will pull a request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2157: [FLINK-4115] FsStateBackend filesystem verificatio...
GitHub user jfg9 opened a pull request: https://github.com/apache/flink/pull/2157 [FLINK-4115] FsStateBackend filesystem verification can cause classpath exceptions There are two changes to FsStateBackend to avoid classpath exceptions when submitting a job from a Flink client which does not have the necessary file system classes on its classpath: - The filesystem is no longer initialised in the FsStateBackend constructor - The verification checks for the checkpoint directory in the FsStateBackend constructor are skipped if Flink does not have built-in support for the URI scheme You can merge this pull request into a Git repository by running: $ git pull https://github.com/jfg9/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2157.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2157 commit e8557895b4ff8e40a0705152ec65d355fe7d5f65 Author: JoshDate: 2016-06-23T21:18:02Z [FLINK-4115] Skip filesystem checks for filesystems with no built-in support commit 94ec8dca6c016c1c500ade90b4c823e5faaecaf3 Author: Josh Date: 2016-06-23T21:22:40Z [FLINK-4115] Remove filesystem initialisation from FsStateBackend constructor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions
[ https://issues.apache.org/jira/browse/FLINK-4115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347390#comment-15347390 ] ASF GitHub Bot commented on FLINK-4115: --- GitHub user jfg9 opened a pull request: https://github.com/apache/flink/pull/2157 [FLINK-4115] FsStateBackend filesystem verification can cause classpath exceptions There are two changes to FsStateBackend to avoid classpath exceptions when submitting a job from a Flink client which does not have the necessary file system classes on its classpath: - The filesystem is no longer initialised in the FsStateBackend constructor - The verification checks for the checkpoint directory in the FsStateBackend constructor are skipped if Flink does not have built-in support for the URI scheme You can merge this pull request into a Git repository by running: $ git pull https://github.com/jfg9/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2157.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2157 commit e8557895b4ff8e40a0705152ec65d355fe7d5f65 Author: JoshDate: 2016-06-23T21:18:02Z [FLINK-4115] Skip filesystem checks for filesystems with no built-in support commit 94ec8dca6c016c1c500ade90b4c823e5faaecaf3 Author: Josh Date: 2016-06-23T21:22:40Z [FLINK-4115] Remove filesystem initialisation from FsStateBackend constructor > FsStateBackend filesystem verification can cause classpath exceptions > - > > Key: FLINK-4115 > URL: https://issues.apache.org/jira/browse/FLINK-4115 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Josh Forman-Gornall >Priority: Minor > > In the constructor of FsStateBackend, the FileSystem for the checkpoint > directory is initialised and it is verified that the checkpoint path exists. > This verification happens in the Flink client program when submitting a job > and can cause classpath issues if classes required to access the file system > are not available in the client's classpath. > For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend > and an s3:// checkpoint directory, we get the below ClassNotFoundException. > This is because the jars needed to use the EMR file system are available only > in the YARN context and not when submitting the job via the Flink client. > {noformat} > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) > at > org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics
[ https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347222#comment-15347222 ] vishnu viswanath commented on FLINK-3962: - I am getting this exception in Local execution aswell. (from IDE or when run through `flink run`) {code} 17:16:38,095 ERROR org.apache.flink.metrics.reporter.JMXReporter - A metric with the name org.apache.flink.metrics:key0=localhost,key1=taskmanager,key2=f7bccd0c64bc9c3643647913eced7104,key3=Streaming_Job,key4=Map,key5=0,name=numRecordsIn was already registered. javax.management.InstanceAlreadyExistsException: org.apache.flink.metrics:key0=localhost,key1=taskmanager,key2=f7bccd0c64bc9c3643647913eced7104,key3=Streaming_Job,key4=Map,key5=0,name=numRecordsIn at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:109) at org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174) at org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:194) at org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:150) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:141) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:235) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) at java.lang.Thread.run(Thread.java:745) {code} > JMXReporter doesn't properly register/deregister metrics > > > Key: FLINK-3962 > URL: https://issues.apache.org/jira/browse/FLINK-3962 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > The following fails our Yarn tests because it checks for errors in the > jobmanager/taskmanager logs: > {noformat} > 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at >
[jira] [Updated] (FLINK-3898) Adamic-Adar Similarity
[ https://issues.apache.org/jira/browse/FLINK-3898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-3898: -- Fix Version/s: 1.1.0 > Adamic-Adar Similarity > -- > > Key: FLINK-3898 > URL: https://issues.apache.org/jira/browse/FLINK-3898 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > The implementation of Adamic-Adar Similarity [0] is very close to Jaccard > Similarity. Whereas Jaccard Similarity counts common neighbors, Adamic-Adar > Similarity sums the inverse logarithm of the degree of common neighbors. > Consideration will be given to the computation of the inverse logarithm, in > particular whether to pre-compute a small array of values. > [0] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions
[ https://issues.apache.org/jira/browse/FLINK-4115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Forman-Gornall updated FLINK-4115: --- Component/s: (was: Core) Local Runtime > FsStateBackend filesystem verification can cause classpath exceptions > - > > Key: FLINK-4115 > URL: https://issues.apache.org/jira/browse/FLINK-4115 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0 >Reporter: Josh Forman-Gornall >Priority: Minor > > In the constructor of FsStateBackend, the FileSystem for the checkpoint > directory is initialised and it is verified that the checkpoint path exists. > This verification happens in the Flink client program when submitting a job > and can cause classpath issues if classes required to access the file system > are not available in the client's classpath. > For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend > and an s3:// checkpoint directory, we get the below ClassNotFoundException. > This is because the jars needed to use the EMR file system are available only > in the YARN context and not when submitting the job via the Flink client. > {noformat} > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) > at > org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175) > at > org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions
[ https://issues.apache.org/jira/browse/FLINK-4115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Forman-Gornall updated FLINK-4115: --- Description: In the constructor of FsStateBackend, the FileSystem for the checkpoint directory is initialised and it is verified that the checkpoint path exists. This verification happens in the Flink client program when submitting a job and can cause classpath issues if classes required to access the file system are not available in the client's classpath. For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend and an s3:// checkpoint directory, we get the below ClassNotFoundException. This is because the jars needed to use the EMR file system are available only in the YARN context and not when submitting the job via the Flink client. {noformat} java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383) at org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175) at org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205) {noformat} was: In the constructor of FsStateBackend, the FileSystem for the checkpoint directory is initialised and it is verified that the checkpoint path exists. This verification happens in the Flink client program when submitting a job and can cause classpath issues if classes required to access the file system are not available in the client's classpath. For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend and an s3:// checkpoint directory, we get the below ClassNotFoundException. This is because the jars needed to use the EMR file system are available only in the YARN context and not when submitting the job via the Flink client. ``` java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383) at org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175) at org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205) ``` > FsStateBackend filesystem verification can cause classpath exceptions > - > > Key: FLINK-4115 > URL: https://issues.apache.org/jira/browse/FLINK-4115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.0 >Reporter: Josh Forman-Gornall >Priority: Minor > > In the constructor of FsStateBackend, the FileSystem for the checkpoint > directory is initialised and it is verified that the checkpoint path exists. > This verification happens in the Flink client program when submitting a job > and can cause classpath issues if classes required to access the file system > are not available in the client's classpath. > For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend > and an s3:// checkpoint directory, we get the below ClassNotFoundException. > This is because the jars needed to use the EMR file system are available only > in the YARN context and not when submitting the job via the Flink client. > {noformat} > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) > at >
[jira] [Created] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions
Josh Forman-Gornall created FLINK-4115: -- Summary: FsStateBackend filesystem verification can cause classpath exceptions Key: FLINK-4115 URL: https://issues.apache.org/jira/browse/FLINK-4115 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.1.0 Reporter: Josh Forman-Gornall Priority: Minor In the constructor of FsStateBackend, the FileSystem for the checkpoint directory is initialised and it is verified that the checkpoint path exists. This verification happens in the Flink client program when submitting a job and can cause classpath issues if classes required to access the file system are not available in the client's classpath. For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend and an s3:// checkpoint directory, we get the below ClassNotFoundException. This is because the jars needed to use the EMR file system are available only in the YARN context and not when submitting the job via the Flink client. ``` java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383) at org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175) at org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205) ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4114) Need a way to manage multiple named, long-lived jobs on a single YARN cluster in an automated manner
Chris Hogue created FLINK-4114: -- Summary: Need a way to manage multiple named, long-lived jobs on a single YARN cluster in an automated manner Key: FLINK-4114 URL: https://issues.apache.org/jira/browse/FLINK-4114 Project: Flink Issue Type: Bug Reporter: Chris Hogue We are running several Flink jobs on a single YARN cluster. Currently each Flink job is run in its own YARN session (and thus its own YARN application ID). The difficulty comes in that we want to manage each of these jobs individually by name. For example we want to start, stop, update one job without affecting others. The primary access to these jobs is via the YARN application ID, which is not meaningful to discern which flink job it is running. It would be nice if we had tools that would allow us to manage the flink jobs by name and have it do the right thing with the YARN session. Today we can use 'flink run' and have it start a YARN session for that job, but from that point forward we have only the YARN application ID to work with. As a concrete example suppose we have 2 jobs with names JobA and JobB. We'd want a way to so something like: flink run ; flink run We'd then want to be able to call: flink cancel JobA The cancel command would spin down the YARN session for JobA in addition to the flink job, leaving JobB running as normal. I've simplified the commands leaving out other options for illustrative purposes. And we'll want to be able to use savepoints through these steps as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4113) Always copy first value in ChainedAllReduceDriver
[ https://issues.apache.org/jira/browse/FLINK-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347023#comment-15347023 ] ASF GitHub Bot commented on FLINK-4113: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2156 [FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4113_always_copy_first_value_in_chainedallreducedriver Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2156 commit f5f162b591b38e890758dc65fee93749ac86a347 Author: Greg HoganDate: 2016-06-23T16:37:37Z [FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver > Always copy first value in ChainedAllReduceDriver > - > > Key: FLINK-4113 > URL: https://issues.apache.org/jira/browse/FLINK-4113 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.1.0, 1.0.4 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.1.0, 1.0.4 > > > {{ChainedAllReduceDriver.collect}} must copy the first record even when > object reuse is enabled or {{base}} may later point to the same object as > {{record}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2156: [FLINK-4113] [runtime] Always copy first value in ...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2156 [FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4113_always_copy_first_value_in_chainedallreducedriver Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2156 commit f5f162b591b38e890758dc65fee93749ac86a347 Author: Greg HoganDate: 2016-06-23T16:37:37Z [FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm
[ https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346899#comment-15346899 ] Greg Hogan commented on FLINK-3879: --- [~vkalavri] here are the timings I get on an AWS c4.2xlarge using FLINK-3879 rebased to master and merged with pr1517 (hash-based combine). Each execution is performing 10 iterations. FLINK-3879 (HITS): $ for i in `seq 10 2 20` ; do echo ; echo $i ; ./bin/flink run -q -class org.apache.flink.graph.examples.HITS ~/flink-gelly-examples_2.10-1.1-SNAPSHOT.jar --input rmat --scale $i --output hash --algorithm HITS ; done Scale 10: ChecksumHashCode 0x018d7c49ca4a, count 902 Execution runtime: 1,034 ms Scale 12: ChecksumHashCode 0x058a6efc82d1, count 3349 Execution runtime: 1,115 ms Scale 14: ChecksumHashCode 0x15030ecf3188, count 12472 Execution runtime: 1,974 ms Scale 16: ChecksumHashCode 0x4f23492849eb, count 46826 Execution runtime: 5,843 ms Scale 18: ChecksumHashCode 0x0001267c806b8338, count 174010 Execution runtime: 21,927 ms Scale 20: ChecksumHashCode 0x000449bd0da45343, count 646203 Execution runtime: 93,488 ms FLINK-2044 (HITSAlgorithm): $ for i in `seq 10 2 20` ; do echo ; echo $i ; ./bin/flink run -q -class org.apache.flink.graph.examples.HITS ~/flink-gelly-examples_2.10-1.1-SNAPSHOT.jar --input rmat --scale $i --output hash --algorithm HITSAlgorithm ; done Scale 10: ChecksumHashCode 0x01c88b0818f0, count 902 Execution runtime: 761 ms Scale 12: Cluster retrieved ChecksumHashCode 0x069bcad3b322, count 3349 Execution runtime: 1,094 ms Scale 14: ChecksumHashCode 0x186c50950fea, count 12472 Execution runtime: 2,290 ms Scale 16: ChecksumHashCode 0x5b741faf30eb, count 46826 Execution runtime: 6,898 ms Scale 18: ChecksumHashCode 0x000153e520a8306c, count 174010 Execution runtime: 28,015 ms Scale 20: ChecksumHashCode 0x0004ed44e75c493a, count 646203 Execution runtime: 120,736 ms > Native implementation of HITS algorithm > --- > > Key: FLINK-3879 > URL: https://issues.apache.org/jira/browse/FLINK-3879 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is > presented in [0] and described in [1]. > "[HITS] is a very popular and effective algorithm to rank documents based on > the link information among a set of documents. The algorithm presumes that a > good hub is a document that points to many others, and a good authority is a > document that many documents point to." > [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf] > This implementation differs from FLINK-2044 by providing for convergence, > outputting both hub and authority scores, and completing in half the number > of iterations. > [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf > [1] https://en.wikipedia.org/wiki/HITS_algorithm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346881#comment-15346881 ] Chesnay Schepler commented on FLINK-4098: - At that point it is completely normal for the java side to wait. Under usual circumstances the operator would be configured, read the broadcastVariables (which java writes without waiting for any acknowledgement), and then explicitly request data from java (until which java blocks). So let's see. We know the python process is created, which is good. We also know that the function is created, but never run. This means that something went wrong with the setup of the operator, which reduces our area of failure to the else block in Environment.execute(). I would (and will over the weekend :) ) plaster that stuff in debug statements. Also, remember to call sys.stdout.flush() after print! This small detail has caused me quite some trouble :) > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346830#comment-15346830 ] ASF GitHub Bot commented on FLINK-3231: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2131 @rmetzger tagging just to make sure you're notified of this PR :) When will you be free to review? Just for my own time allocation for when to continue working on the Kinesis connector. If there's anything majorly wrong with the implementation explained in the above comment, please let me know and I'll try to address them before effort on a detailed review. Thanks in advance! > Handle Kinesis-side resharding in Kinesis streaming consumer > > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2131: [FLINK-3231][streaming-connectors] FlinkKinesisConsumer r...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2131 @rmetzger tagging just to make sure you're notified of this PR :) When will you be free to review? Just for my own time allocation for when to continue working on the Kinesis connector. If there's anything majorly wrong with the implementation explained in the above comment, please let me know and I'll try to address them before effort on a detailed review. Thanks in advance! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts
[ https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346794#comment-15346794 ] ASF GitHub Bot commented on FLINK-4110: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2153 This will be very helpful! LGTM. > Provide testing skeleton in quickstarts > --- > > Key: FLINK-4110 > URL: https://issues.apache.org/jira/browse/FLINK-4110 > Project: Flink > Issue Type: Improvement > Components: Build System, Quickstarts >Affects Versions: 1.1.0 >Reporter: Robert Metzger > > Users often ask how they can test their Flink code. > I suggest to provide a very simple skeleton / example in the quickstarts, > showing how its done. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts
[ https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346793#comment-15346793 ] ASF GitHub Bot commented on FLINK-4110: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2153#discussion_r68276459 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}; + + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.test.util.TestUtils; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Skeleton class showing how to write integration tests against a running embedded Flink + * testing cluster. + */ +public class TestSkeleton { + + protected static final Logger LOG = LoggerFactory.getLogger(TestSkeleton.class); + private static ForkableFlinkMiniCluster flink; + private static int flinkPort; + + /** +* Start a re-usable Flink mini cluster +*/ + @BeforeClass + public static void setupFlink() { + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); --- End diff -- Is this config related to any of the demonstrated tests? If not, perhaps it can be removed. > Provide testing skeleton in quickstarts > --- > > Key: FLINK-4110 > URL: https://issues.apache.org/jira/browse/FLINK-4110 > Project: Flink > Issue Type: Improvement > Components: Build System, Quickstarts >Affects Versions: 1.1.0 >Reporter: Robert Metzger > > Users often ask how they can test their Flink code. > I suggest to provide a very simple skeleton / example in the quickstarts, > showing how its done. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2153: [FLINK-4110] Add testing skeleton to quickstart
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2153 This will be very helpful! LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2153: [FLINK-4110] Add testing skeleton to quickstart
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2153#discussion_r68276459 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}; + + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.test.util.TestUtils; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Skeleton class showing how to write integration tests against a running embedded Flink + * testing cluster. + */ +public class TestSkeleton { + + protected static final Logger LOG = LoggerFactory.getLogger(TestSkeleton.class); + private static ForkableFlinkMiniCluster flink; + private static int flinkPort; + + /** +* Start a re-usable Flink mini cluster +*/ + @BeforeClass + public static void setupFlink() { + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); --- End diff -- Is this config related to any of the demonstrated tests? If not, perhaps it can be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts
[ https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346750#comment-15346750 ] ASF GitHub Bot commented on FLINK-4110: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2153#discussion_r68272113 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}; + + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.test.util.TestUtils; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Skeleton class showing how to write integration tests against a running embedded Flink + * testing cluster. + */ +public class TestSkeleton { + + protected static final Logger LOG = LoggerFactory.getLogger(TestSkeleton.class); + private static ForkableFlinkMiniCluster flink; + private static int flinkPort; + + /** +* Start a re-usable Flink mini cluster +*/ + @BeforeClass + public static void setupFlink() { + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + flink = new ForkableFlinkMiniCluster(flinkConfig, false); + flink.start(); + + flinkPort = flink.getLeaderRPCPort(); + LOG.info("Started a Flink testing cluster on port {}", flinkPort); + } + + + /** +* This test uses a finite stream. The TestingSource will stop after 1000 elements have been send. --- End diff -- send --> sent > Provide testing skeleton in quickstarts > --- > > Key: FLINK-4110 > URL: https://issues.apache.org/jira/browse/FLINK-4110 > Project: Flink > Issue Type: Improvement > Components: Build System, Quickstarts >Affects Versions: 1.1.0 >Reporter: Robert Metzger > > Users often ask how they can test their Flink code. > I suggest to provide a very simple skeleton / example in the quickstarts, > showing how its done. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2153: [FLINK-4110] Add testing skeleton to quickstart
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2153#discussion_r68272113 --- Diff: flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}; + + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.test.util.TestUtils; +import org.apache.flink.util.Collector; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Skeleton class showing how to write integration tests against a running embedded Flink + * testing cluster. + */ +public class TestSkeleton { + + protected static final Logger LOG = LoggerFactory.getLogger(TestSkeleton.class); + private static ForkableFlinkMiniCluster flink; + private static int flinkPort; + + /** +* Start a re-usable Flink mini cluster +*/ + @BeforeClass + public static void setupFlink() { + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); + flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); + + flink = new ForkableFlinkMiniCluster(flinkConfig, false); + flink.start(); + + flinkPort = flink.getLeaderRPCPort(); + LOG.info("Started a Flink testing cluster on port {}", flinkPort); + } + + + /** +* This test uses a finite stream. The TestingSource will stop after 1000 elements have been send. --- End diff -- send --> sent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2147: [FLINK-1946] reduce verbosity of Yarn cluster setup
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2147 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1946) Make yarn tests logging less verbose
[ https://issues.apache.org/jira/browse/FLINK-1946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346735#comment-15346735 ] ASF GitHub Bot commented on FLINK-1946: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2147 +1 > Make yarn tests logging less verbose > > > Key: FLINK-1946 > URL: https://issues.apache.org/jira/browse/FLINK-1946 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Till Rohrmann >Priority: Minor > > Currently, the yarn tests log on the INFO level making the test outputs > confusing. Furthermore some status messages are written to stdout. I think > these messages are not necessary to be shown to the user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2147: [FLINK-1946] reduce verbosity of Yarn cluster setu...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2147#discussion_r68269922 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -750,13 +751,16 @@ protected YarnClusterClient deployInternal() throws Exception { LOG.info("YARN application has been deployed successfully."); break loop; default: - LOG.info("Deploying cluster, current state " + appState); - if(waittime > 6) { + if (appState != lastAppState) { + LOG.info("Deploying cluster, current state " + appState); + } + if(waitTime > 6) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1946) Make yarn tests logging less verbose
[ https://issues.apache.org/jira/browse/FLINK-1946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346726#comment-15346726 ] ASF GitHub Bot commented on FLINK-1946: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2147#discussion_r68269922 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -750,13 +751,16 @@ protected YarnClusterClient deployInternal() throws Exception { LOG.info("YARN application has been deployed successfully."); break loop; default: - LOG.info("Deploying cluster, current state " + appState); - if(waittime > 6) { + if (appState != lastAppState) { + LOG.info("Deploying cluster, current state " + appState); + } + if(waitTime > 6) { --- End diff -- missing space after if > Make yarn tests logging less verbose > > > Key: FLINK-1946 > URL: https://issues.apache.org/jira/browse/FLINK-1946 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Till Rohrmann >Priority: Minor > > Currently, the yarn tests log on the INFO level making the test outputs > confusing. Furthermore some status messages are written to stdout. I think > these messages are not necessary to be shown to the user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2151: [hotfix][docs] Add warning to Cassandra documentation
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2151 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346713#comment-15346713 ] Geoffrey Mon commented on FLINK-4098: - After adding some more print statements in different places, I found that while running a normal {{.map()}} on a DataSet caused {{MapFunction._run()}}, {{MapFunction._configure()}}, and {{MapFunction.collect()}} to run, running {{.map()}} as an iteration caused none of these functions to run. The MapFunction instance was initialized but never used. Does this mean the Java side of Flink was waiting for the Python side to return its results, causing the hang? > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4113) Always copy first value in ChainedAllReduceDriver
Greg Hogan created FLINK-4113: - Summary: Always copy first value in ChainedAllReduceDriver Key: FLINK-4113 URL: https://issues.apache.org/jira/browse/FLINK-4113 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.1.0, 1.0.4 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Critical Fix For: 1.1.0, 1.0.4 {{ChainedAllReduceDriver.collect}} must copy the first record even when object reuse is enabled or {{base}} may later point to the same object as {{record}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4093) Expose metric interfaces
[ https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346712#comment-15346712 ] ASF GitHub Bot commented on FLINK-4093: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68268650 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- we should change it for the other methods as well. > Expose metric interfaces > > > Key: FLINK-4093 > URL: https://issues.apache.org/jira/browse/FLINK-4093 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68268650 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- we should change it for the other methods as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4093) Expose metric interfaces
[ https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346686#comment-15346686 ] ASF GitHub Bot commented on FLINK-4093: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68265180 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- Ah true. Good idea :-) Maybe we should change it for the `gauge` and `histogram` functions as well. > Expose metric interfaces > > > Key: FLINK-4093 > URL: https://issues.apache.org/jira/browse/FLINK-4093 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68265180 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- Ah true. Good idea :-) Maybe we should change it for the `gauge` and `histogram` functions as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-3800. Resolution: Fixed Fixed via 6420c1c264ed3ce0c32ba164c2cdb85ccdccf265 > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2096 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346669#comment-15346669 ] ASF GitHub Bot commented on FLINK-3800: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2096 > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Geoffrey Mon updated FLINK-4098: Comment: was deleted (was: Maybe it would be simpler to implement iterations in Python similar to aggregations?) > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346664#comment-15346664 ] Geoffrey Mon commented on FLINK-4098: - Maybe it would be simpler to implement iterations in Python similar to aggregations? > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug
[ https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4061: -- Priority: Major (was: Blocker) > about flink jdbc connect oracle db exists a crital bug > --- > > Key: FLINK-4061 > URL: https://issues.apache.org/jira/browse/FLINK-4061 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.0 > Environment: ubuntu ,jdk1.8.0 ,Start a Local Flink Cluster >Reporter: dengchangfu > Original Estimate: 168h > Remaining Estimate: 168h > > I use flink-jdbc to connect oracle db for etl, so i write a demo to test the > feature. the code is simple,but after I submit this app ,a exception happen. > exception info like this: > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > my code like this: > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; > import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; > import org.apache.flink.api.table.Row; > import org.apache.flink.api.table.typeutils.RowTypeInfo; > import > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; > import java.sql.ResultSet; > import java.sql.Types; > /** > * Skeleton for a Flink Job. > * > * For a full example of a Flink Job, see the WordCountJob.java file in the > * same package/directory or have a look at the website. > * > * You can also generate a .jar file that you can submit on your Flink > * cluster. > * Just type > *mvn clean package > * in the projects root directory. > * You will find the jar in > *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar > * > */ > public class Job { > public static final TypeInformation[] fieldTypes = new > TypeInformation[]{ > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.FLOAT_TYPE_INFO > }; > public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); > public static void main(String[] args) { > // set up the execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > JDBCInputFormatBuilder inputBuilder = > JDBCInputFormat.buildJDBCInputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("select CLIENT_ID,OCCUR_BALANCE from > HS_ASSET.FUNDJOUR@OTC") > .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) > .setRowTypeInfo(rowTypeInfo); > DataSet source = env.createInput(inputBuilder.finish()); > source.output(JDBCOutputFormat.buildJDBCOutputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("insert into dengabc (client_id,salary) > values(?,?)") > .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE}) > .finish()); > //source.print(); > //source.first(20).print(); > //dbData.print(); > /** > * Here, you can start creating your execution plan for Flink. > * > * Start with getting some data from the environment, like > *env.readTextFile(textPath); > * > * then, transform the resulting DataSet using operations > * like > *.filter() > *.flatMap() > *.join() > *.coGroup() > * and many more. > * Have a look at the programming guide for the Java API: > * > * http://flink.apache.org/docs/latest/apis/batch/index.html > * > * and the examples > * > * http://flink.apache.org/docs/latest/apis/batch/examples.html > * > */ > // execute program > try { > env.execute("Flink Java API Skeleton"); > } catch (Exception e) { > e.getMessage(); > } > } > } > my pom.xml like this: > > http://maven.apache.org/POM/4.0.0; >
[jira] [Commented] (FLINK-4112) Replace SuppressRestartException with disabling RestartStrategy
[ https://issues.apache.org/jira/browse/FLINK-4112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346638#comment-15346638 ] Chesnay Schepler commented on FLINK-4112: - Does this entail removing the SuppressRestartException class? > Replace SuppressRestartException with disabling RestartStrategy > --- > > Key: FLINK-4112 > URL: https://issues.apache.org/jira/browse/FLINK-4112 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.2.0 > > > Currently, Flink uses {{SuppressRestartExceptions}} to suppress job restarts. > This mechanism can be used by different components to control the lifecycle > of a job. Since the control flow of exceptions can be quite difficult > retrace, I propose to replace this mechanism by an explicit > {{RestartStrategy}} disable method. So instead of throwing a > {{SuppressRestartException}} we should disable the {{RestartStrategy}} to > avoid further job restarts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346634#comment-15346634 ] Chesnay Schepler commented on FLINK-4098: - print statements should appear in the taskmanager.out > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4093) Expose metric interfaces
[ https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346611#comment-15346611 ] ASF GitHub Bot commented on FLINK-4093: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68256978 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- in other words, it allows the instantiation, registration and assignment to a variable or field in a single line without loosing the subclass type. > Expose metric interfaces > > > Key: FLINK-4093 > URL: https://issues.apache.org/jira/browse/FLINK-4093 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68256978 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- in other words, it allows the instantiation, registration and assignment to a variable or field in a single line without loosing the subclass type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346600#comment-15346600 ] Geoffrey Mon edited comment on FLINK-4098 at 6/23/16 3:26 PM: -- I wasn't sure how to put in print statements because they don't seem to appear in the jobmanager {{.out}} when I add them into {{MapFunction._run()}}. Here's a trimmed jobmanager log for a simple Python iteration program that iterates each number in a Dataset: {code} INFO org.apache.flink.runtime.jobmanager.JobManager- Status of job ac35083fec7c77cc1707245563aed90d (Flink Java Job at Thu Jun 23 11:14:51 EDT 2016) changed to RUNNING. INFO org.apache.flink.runtime.taskmanager.Task - Obtaining local cache file for 'flink' INFO org.apache.flink.runtime.taskmanager.Task - PartialSolution (Bulk Iteration) (1/1) switched to RUNNING INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- PartialSolution (Bulk Iteration) (1/1) (5c0ff0bae3952df909198ebf94f51dcf) switched from DEPLOYING to RUNNING INFO org.apache.flink.runtime.iterative.task.IterationHeadTask - starting iteration \[1\]: PartialSolution (Bulk Iteration) (1/1) INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from CREATED to SCHEDULED INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from SCHEDULED to DEPLOYING INFO org.apache.flink.runtime.taskmanager.Task - MapPartition (PythonMap) (1/1) switched to RUNNING INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from DEPLOYING to RUNNING INFO org.apache.flink.runtime.iterative.task.IterationTailTask - starting iteration \[1\]: MapPartition (PythonMap) (1/1) {code} The program does not proceed past that point and appears to hang without any errors. was (Author: hydronium): Here's a jobmanager log for a simple Python iteration program that iterates each number in a Dataset: 2016-06-23 11:14:51,900 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job ac35083fec7c77cc1707245563aed90d (Flink Java Job at Thu Jun 23 11:14:51 EDT 2016) changed to RUNNING. 2016-06-23 11:14:52,266 INFO org.apache.flink.runtime.taskmanager.Task - Obtaining local cache file for 'flink' 2016-06-23 11:14:52,267 INFO org.apache.flink.runtime.taskmanager.Task - PartialSolution (Bulk Iteration) (1/1) switched to RUNNING 2016-06-23 11:14:52,268 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- PartialSolution (Bulk Iteration) (1/1) (5c0ff0bae3952df909198ebf94f51dcf) switched from DEPLOYING to RUNNING 2016-06-23 11:14:53,053 INFO org.apache.flink.runtime.iterative.task.IterationHeadTask - starting iteration [1]: PartialSolution (Bulk Iteration) (1/1) 2016-06-23 11:14:53,059 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from CREATED to SCHEDULED 2016-06-23 11:14:53,059 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from SCHEDULED to DEPLOYING 2016-06-23 11:14:53,077 INFO org.apache.flink.runtime.taskmanager.Task - MapPartition (PythonMap) (1/1) switched to RUNNING 2016-06-23 11:14:53,078 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from DEPLOYING to RUNNING 2016-06-23 11:14:53,155 INFO org.apache.flink.runtime.iterative.task.IterationTailTask - starting iteration [1]: MapPartition (PythonMap) (1/1) The program does not proceed past that point and appears to hang without any errors. > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4098) Iteration support in Python API
[ https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346600#comment-15346600 ] Geoffrey Mon commented on FLINK-4098: - Here's a jobmanager log for a simple Python iteration program that iterates each number in a Dataset: 2016-06-23 11:14:51,900 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job ac35083fec7c77cc1707245563aed90d (Flink Java Job at Thu Jun 23 11:14:51 EDT 2016) changed to RUNNING. 2016-06-23 11:14:52,266 INFO org.apache.flink.runtime.taskmanager.Task - Obtaining local cache file for 'flink' 2016-06-23 11:14:52,267 INFO org.apache.flink.runtime.taskmanager.Task - PartialSolution (Bulk Iteration) (1/1) switched to RUNNING 2016-06-23 11:14:52,268 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- PartialSolution (Bulk Iteration) (1/1) (5c0ff0bae3952df909198ebf94f51dcf) switched from DEPLOYING to RUNNING 2016-06-23 11:14:53,053 INFO org.apache.flink.runtime.iterative.task.IterationHeadTask - starting iteration [1]: PartialSolution (Bulk Iteration) (1/1) 2016-06-23 11:14:53,059 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from CREATED to SCHEDULED 2016-06-23 11:14:53,059 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from SCHEDULED to DEPLOYING 2016-06-23 11:14:53,077 INFO org.apache.flink.runtime.taskmanager.Task - MapPartition (PythonMap) (1/1) switched to RUNNING 2016-06-23 11:14:53,078 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from DEPLOYING to RUNNING 2016-06-23 11:14:53,155 INFO org.apache.flink.runtime.iterative.task.IterationTailTask - starting iteration [1]: MapPartition (PythonMap) (1/1) The program does not proceed past that point and appears to hang without any errors. > Iteration support in Python API > --- > > Key: FLINK-4098 > URL: https://issues.apache.org/jira/browse/FLINK-4098 > Project: Flink > Issue Type: New Feature > Components: Python API >Affects Versions: 1.0.2 >Reporter: Geoffrey Mon >Priority: Minor > > Bulk and delta iterations are not supported in the Python API. Currently > working on this at https://github.com/GEOFBOT/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4093) Expose metric interfaces
[ https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346598#comment-15346598 ] ASF GitHub Bot commented on FLINK-4093: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68255328 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- users can write ```MyCounter c = metricGroup.addCounter("myC", new MyCounter());``` > Expose metric interfaces > > > Key: FLINK-4093 > URL: https://issues.apache.org/jira/browse/FLINK-4093 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68255328 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- users can write ```MyCounter c = metricGroup.addCounter("myC", new MyCounter());``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3319) Add or operator to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-3319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346584#comment-15346584 ] ASF GitHub Bot commented on FLINK-3319: --- GitHub user thormanrd opened a pull request: https://github.com/apache/flink/pull/2155 [FLINK-3319] Added Or function for where clause in CEP Pattern Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/thormanrd/flink feature/FLINK-3319 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2155.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2155 commit e71514574e6e7d5623eaf4e556d708c642ccb6f2 Author: Bob ThormanDate: 2016-06-16T12:48:53Z FLINT-3839 Added the extraction of jar files from a URL spec that is a directory in the form of file:///path/to/jars/<*> commit 5705962613ea0e64bfbdfa1b544c3fd8870a4540 Author: Bob Thorman Date: 2016-06-22T00:54:02Z FLINK-3319 Added OrFilterFunction class and the or method to the Pattern class commit 771f7029571db0345e0713c93f914ba945690c97 Author: Bob Thorman Date: 2016-06-22T13:23:48Z FLINK-3319 Added or method to scala Pattern class > Add or operator to CEP's pattern API > > > Key: FLINK-3319 > URL: https://issues.apache.org/jira/browse/FLINK-3319 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Robert Thorman >Priority: Minor > > Adding an {{or}} operator to CEP's pattern API would be beneficial. This > would considerably extend the set of supported patterns. The {{or}} operator > lets you define multiple succeeding pattern states for the next stage. > {code} > Pattern.begin("start").next("middle1").where(...).or("middle2").where(...) > {code} > In order to implement the {{or}} operator, one has to extend the {{Pattern}} > class. Furthermore, the {{NFACompiler}} has to be extended to generate two > resulting pattern states in case of an {{or}} operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2155: [FLINK-3319] Added Or function for where clause in...
GitHub user thormanrd opened a pull request: https://github.com/apache/flink/pull/2155 [FLINK-3319] Added Or function for where clause in CEP Pattern Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/thormanrd/flink feature/FLINK-3319 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2155.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2155 commit e71514574e6e7d5623eaf4e556d708c642ccb6f2 Author: Bob ThormanDate: 2016-06-16T12:48:53Z FLINT-3839 Added the extraction of jar files from a URL spec that is a directory in the form of file:///path/to/jars/<*> commit 5705962613ea0e64bfbdfa1b544c3fd8870a4540 Author: Bob Thorman Date: 2016-06-22T00:54:02Z FLINK-3319 Added OrFilterFunction class and the or method to the Pattern class commit 771f7029571db0345e0713c93f914ba945690c97 Author: Bob Thorman Date: 2016-06-22T13:23:48Z FLINK-3319 Added or method to scala Pattern class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4093) Expose metric interfaces
[ https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346547#comment-15346547 ] ASF GitHub Bot commented on FLINK-4093: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2134 The changes look good to me. +1 for merging. Good work @zentol. I only had a question concerning the use of generic parameters which is not really necessary, imo. > Expose metric interfaces > > > Key: FLINK-4093 > URL: https://issues.apache.org/jira/browse/FLINK-4093 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2134: [FLINK-4093] Expose metric interfaces
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2134 The changes look good to me. +1 for merging. Good work @zentol. I only had a question concerning the use of generic parameters which is not really necessary, imo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4093) Expose metric interfaces
[ https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346545#comment-15346545 ] ASF GitHub Bot commented on FLINK-4093: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68248549 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- What's the advantage of introducing the bounded generic parameter over simply having a `Counter` parameter? > Expose metric interfaces > > > Key: FLINK-4093 > URL: https://issues.apache.org/jira/browse/FLINK-4093 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2134#discussion_r68248549 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java --- @@ -146,7 +147,16 @@ public Counter counter(int name) { @Override public Counter counter(String name) { - Counter counter = new Counter(); + return counter(name, new SimpleCounter()); + } + + @Override + public C counter(int name, C counter) { --- End diff -- What's the advantage of introducing the bounded generic parameter over simply having a `Counter` parameter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2105#discussion_r68247987 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java --- @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) { if (reporter instanceof Scheduled) { LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); - long millis = timeunit.toMillis(period); - timer = new java.util.Timer("Periodic Metrics Reporter", true); - timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis); + executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); } else { - timer = null; + executor = null; } } catch (Throwable t) { reporter = new JMXReporter(); - timer = null; + executor = null; LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t); } this.reporter = reporter; - this.timer = timer; + this.executor = executor; } } /** * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ public void shutdown() { - if (timer != null) { - timer.cancel(); - } if (reporter != null) { try { reporter.close(); } catch (Throwable t) { LOG.warn("Metrics reporter did not shut down cleanly", t); } } + if (executor != null) { + executor.shutdownNow(); --- End diff -- *correction* all reporters _that regularly send out reports_ follow the same logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4074) Reporter can block TaskManager shutdown
[ https://issues.apache.org/jira/browse/FLINK-4074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346541#comment-15346541 ] ASF GitHub Bot commented on FLINK-4074: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2105#discussion_r68247987 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java --- @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) { if (reporter instanceof Scheduled) { LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); - long millis = timeunit.toMillis(period); - timer = new java.util.Timer("Periodic Metrics Reporter", true); - timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis); + executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); } else { - timer = null; + executor = null; } } catch (Throwable t) { reporter = new JMXReporter(); - timer = null; + executor = null; LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t); } this.reporter = reporter; - this.timer = timer; + this.executor = executor; } } /** * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ public void shutdown() { - if (timer != null) { - timer.cancel(); - } if (reporter != null) { try { reporter.close(); } catch (Throwable t) { LOG.warn("Metrics reporter did not shut down cleanly", t); } } + if (executor != null) { + executor.shutdownNow(); --- End diff -- *correction* all reporters _that regularly send out reports_ follow the same logic. > Reporter can block TaskManager shutdown > --- > > Key: FLINK-4074 > URL: https://issues.apache.org/jira/browse/FLINK-4074 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > > If a report is being submitted while a TaskManager is shutting down the > reporter can cause the shutdown to be delayed since it submits the complete > report. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346488#comment-15346488 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2110 Excellent work @aljoscha. Great fix for the problem and it's also really nice that we could get rid of the IT case :-) +1 for merging after addressing my minor comments. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2110 Excellent work @aljoscha. Great fix for the problem and it's also really nice that we could get rid of the IT case :-) +1 for merging after addressing my minor comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346486#comment-15346486 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68241018 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Tests for stream operator chaining behaviour. + */ +public class ChainingITCase { --- End diff -- I think that this test case no longer needs to be an IT case. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the >
[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68241018 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SplitStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.contains; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Tests for stream operator chaining behaviour. + */ +public class ChainingITCase { --- End diff -- I think that this test case no longer needs to be an IT case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2105#discussion_r68238993 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java --- @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) { if (reporter instanceof Scheduled) { LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); - long millis = timeunit.toMillis(period); - timer = new java.util.Timer("Periodic Metrics Reporter", true); - timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis); + executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); } else { - timer = null; + executor = null; } } catch (Throwable t) { reporter = new JMXReporter(); - timer = null; + executor = null; LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t); } this.reporter = reporter; - this.timer = timer; + this.executor = executor; } } /** * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ public void shutdown() { - if (timer != null) { - timer.cancel(); - } if (reporter != null) { try { reporter.close(); } catch (Throwable t) { LOG.warn("Metrics reporter did not shut down cleanly", t); } } + if (executor != null) { + executor.shutdownNow(); --- End diff -- Alright. But not all reporters use the same logic, since we have `Scheduled` and non-scheduled reporters, haven't we? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4062) Update Windowing Documentation
[ https://issues.apache.org/jira/browse/FLINK-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346465#comment-15346465 ] ASF GitHub Bot commented on FLINK-4062: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2154 [FLINK-4062] Update Windowing Documentation R: @uce and @kl0u for review You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window/doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2154.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2154 commit 6e0439dffd35c0d924f0ee03d1e075ab6a762989 Author: Aljoscha KrettekDate: 2016-06-23T08:23:02Z [FLINK-4062] Update Windowing Documentation > Update Windowing Documentation > -- > > Key: FLINK-4062 > URL: https://issues.apache.org/jira/browse/FLINK-4062 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > The window documentation could be a bit more principled and also needs > updating with the new allowed lateness setting. > There is also essentially no documentation about how to write a custom > trigger. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346484#comment-15346484 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68240257 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -386,4 +402,23 @@ public void close() { } } } + + /** +* Special version of {@link BroadcastingOutputCollector} that performs a shallow copy of the +* {@link StreamRecord} to ensure that multi-chaining works correctly. +*/ + private static final class CopyingBroadcastingOutputCollector extends BroadcastingOutputCollector { + + public CopyingBroadcastingOutputCollector(Output[] outputs) { + super(outputs); + } + + @Override + public void collect(StreamRecord record) { + for (Output output : outputs) { + StreamRecord shallowCopy = record.copy(record.getValue()); + output.collect(shallowCopy); + } --- End diff -- Here, the same. I think we could save one copying operation by giving the original `record` to the last output. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output .collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68240257 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -386,4 +402,23 @@ public void close() { } } } + + /** +* Special version of {@link BroadcastingOutputCollector} that performs a shallow copy of the +* {@link StreamRecord} to ensure that multi-chaining works correctly. +*/ + private static final class CopyingBroadcastingOutputCollector extends BroadcastingOutputCollector { + + public CopyingBroadcastingOutputCollector(Output[] outputs) { + super(outputs); + } + + @Override + public void collect(StreamRecord record) { + for (Output output : outputs) { + StreamRecord shallowCopy = record.copy(record.getValue()); + output.collect(shallowCopy); + } --- End diff -- Here, the same. I think we could save one copying operation by giving the original `record` to the last output. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68239970 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.List; +import java.util.Set; + + +/** + * Special version of {@link DirectedOutput} that performs a shallow copy of the + * {@link StreamRecord} to ensure that multi-chaining works correctly. + */ +public class CopyingDirectedOutput extends DirectedOutput { + + @SuppressWarnings({"unchecked", "rawtypes"}) + public CopyingDirectedOutput( + ListoutputSelectors, + List , StreamEdge>> outputs) { + super(outputSelectors, outputs); + } + + @Override + public void collect(StreamRecord record) { + Set
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346476#comment-15346476 ] ASF GitHub Bot commented on FLINK-3974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2110#discussion_r68239970 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector.selector; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.List; +import java.util.Set; + + +/** + * Special version of {@link DirectedOutput} that performs a shallow copy of the + * {@link StreamRecord} to ensure that multi-chaining works correctly. + */ +public class CopyingDirectedOutput extends DirectedOutput { + + @SuppressWarnings({"unchecked", "rawtypes"}) + public CopyingDirectedOutput( + ListoutputSelectors, + List , StreamEdge>> outputs) { + super(outputSelectors, outputs); + } + + @Override + public void collect(StreamRecord record) { + Set
[GitHub] flink pull request #2154: [FLINK-4062] Update Windowing Documentation
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2154 [FLINK-4062] Update Windowing Documentation R: @uce and @kl0u for review You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window/doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2154.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2154 commit 6e0439dffd35c0d924f0ee03d1e075ab6a762989 Author: Aljoscha KrettekDate: 2016-06-23T08:23:02Z [FLINK-4062] Update Windowing Documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4074) Reporter can block TaskManager shutdown
[ https://issues.apache.org/jira/browse/FLINK-4074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346469#comment-15346469 ] ASF GitHub Bot commented on FLINK-4074: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2105#discussion_r68238993 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java --- @@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) { if (reporter instanceof Scheduled) { LOG.info("Periodically reporting metrics in intervals of {} {}", period, timeunit.name()); - long millis = timeunit.toMillis(period); - timer = new java.util.Timer("Periodic Metrics Reporter", true); - timer.schedule(new ReporterTask((Scheduled) reporter), millis, millis); + executor.scheduleWithFixedDelay(new ReporterTask((Scheduled) reporter), period, period, timeunit); } else { - timer = null; + executor = null; } } catch (Throwable t) { reporter = new JMXReporter(); - timer = null; + executor = null; LOG.error("Could not instantiate custom metrics reporter. Defaulting to JMX metrics export.", t); } this.reporter = reporter; - this.timer = timer; + this.executor = executor; } } /** * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}. */ public void shutdown() { - if (timer != null) { - timer.cancel(); - } if (reporter != null) { try { reporter.close(); } catch (Throwable t) { LOG.warn("Metrics reporter did not shut down cleanly", t); } } + if (executor != null) { + executor.shutdownNow(); --- End diff -- Alright. But not all reporters use the same logic, since we have `Scheduled` and non-scheduled reporters, haven't we? > Reporter can block TaskManager shutdown > --- > > Key: FLINK-4074 > URL: https://issues.apache.org/jira/browse/FLINK-4074 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > > If a report is being submitted while a TaskManager is shutting down the > reporter can cause the shutdown to be delayed since it submits the complete > report. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4087) JMXReporter can't handle port conflicts
[ https://issues.apache.org/jira/browse/FLINK-4087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346461#comment-15346461 ] ASF GitHub Bot commented on FLINK-4087: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2145#discussion_r68238504 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java --- @@ -73,10 +86,61 @@ public JMXReporter() { // @Override - public void open(Configuration config) {} + public void open(Configuration config) { + this.jmxServer = startJmxServer(config); + } + + private static JMXServer startJmxServer(Configuration config) { + JMXServer jmxServer; + + String portRange = config.getString(KEY_METRICS_JMX_PORT, "9010-9025"); + String[] ports = portRange.split("-"); + + if (ports.length == 0 || ports.length > 2) { + throw new IllegalArgumentException("JMX port range was configured incorrectly. " + + "Expected: [-] Configured: " + portRange); + } + + if (ports.length == 1) { //single port was configured + int port = Integer.parseInt(ports[0]); + jmxServer = new JMXServer(port); + try { + jmxServer.start(); + } catch (IOException e) { + throw new RuntimeException("Could not start JMX server on port " + port + "."); + } + return jmxServer; + } else { //port range was configured + int start = Integer.parseInt(ports[0]); + int end = Integer.parseInt(ports[1]); + while (true) { + try { + jmxServer = new JMXServer(start); + jmxServer.start(); + LOG.info("Starting JMX on port " + start + "."); --- End diff -- Sure, but it might be interesting to see explicitly which ports were tried, imho. > JMXReporter can't handle port conflicts > --- > > Key: FLINK-4087 > URL: https://issues.apache.org/jira/browse/FLINK-4087 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.1.0 > > > The JMXReporter is currently configured to use a single port that is set as a > JVM argument. > This approach has a few disadvantages: > If multiple TaskManagers run on the same machine only 1 can expose metrics. > This issue is compounded by the upcoming JobManager metrics, which would then > prevent TM metrics from being exposed in local setups. > Currently, we prevent other TM's from exposing metrics by checking the the > start-daemon-sh whether a TM is already running, and if so clear the > arguments. This isn't a particular safe way to do it, and this script is not > used when deploying on yarn, leading to TM failures since the JVM can't > allocate the JMX port. > We should find a way to specifiy port-ranges for JMX and log the final port > used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2145: [FLINK-4087] [metrics] Improved JMX port handling
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2145#discussion_r68238504 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java --- @@ -73,10 +86,61 @@ public JMXReporter() { // @Override - public void open(Configuration config) {} + public void open(Configuration config) { + this.jmxServer = startJmxServer(config); + } + + private static JMXServer startJmxServer(Configuration config) { + JMXServer jmxServer; + + String portRange = config.getString(KEY_METRICS_JMX_PORT, "9010-9025"); + String[] ports = portRange.split("-"); + + if (ports.length == 0 || ports.length > 2) { + throw new IllegalArgumentException("JMX port range was configured incorrectly. " + + "Expected: [-] Configured: " + portRange); + } + + if (ports.length == 1) { //single port was configured + int port = Integer.parseInt(ports[0]); + jmxServer = new JMXServer(port); + try { + jmxServer.start(); + } catch (IOException e) { + throw new RuntimeException("Could not start JMX server on port " + port + "."); + } + return jmxServer; + } else { //port range was configured + int start = Integer.parseInt(ports[0]); + int end = Integer.parseInt(ports[1]); + while (true) { + try { + jmxServer = new JMXServer(start); + jmxServer.start(); + LOG.info("Starting JMX on port " + start + "."); --- End diff -- Sure, but it might be interesting to see explicitly which ports were tried, imho. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4087) JMXReporter can't handle port conflicts
[ https://issues.apache.org/jira/browse/FLINK-4087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346459#comment-15346459 ] ASF GitHub Bot commented on FLINK-4087: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2145#discussion_r68238384 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java --- @@ -265,4 +329,72 @@ public Object getValue() { return gauge.getValue(); } } + + /** +* JMX Server implementation that JMX clients can connect to. +* +* Heavily based on j256 simplejmx project +* +* https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java +*/ + private static class JMXServer { + private int port; + private Registry rmiRegistry; --- End diff -- Hmm, if we don't understand the code and, thus, cannot explain what it does, we shouldn't include the code. The question who should maintain this kind of code. I think it would be best if you could do some research to clarify how the `JMXServer` works and add some comments explaining the different components. > JMXReporter can't handle port conflicts > --- > > Key: FLINK-4087 > URL: https://issues.apache.org/jira/browse/FLINK-4087 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.1.0 > > > The JMXReporter is currently configured to use a single port that is set as a > JVM argument. > This approach has a few disadvantages: > If multiple TaskManagers run on the same machine only 1 can expose metrics. > This issue is compounded by the upcoming JobManager metrics, which would then > prevent TM metrics from being exposed in local setups. > Currently, we prevent other TM's from exposing metrics by checking the the > start-daemon-sh whether a TM is already running, and if so clear the > arguments. This isn't a particular safe way to do it, and this script is not > used when deploying on yarn, leading to TM failures since the JVM can't > allocate the JMX port. > We should find a way to specifiy port-ranges for JMX and log the final port > used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2145: [FLINK-4087] [metrics] Improved JMX port handling
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2145#discussion_r68238384 --- Diff: flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java --- @@ -265,4 +329,72 @@ public Object getValue() { return gauge.getValue(); } } + + /** +* JMX Server implementation that JMX clients can connect to. +* +* Heavily based on j256 simplejmx project +* +* https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java +*/ + private static class JMXServer { + private int port; + private Registry rmiRegistry; --- End diff -- Hmm, if we don't understand the code and, thus, cannot explain what it does, we shouldn't include the code. The question who should maintain this kind of code. I think it would be best if you could do some research to clarify how the `JMXServer` works and add some comments explaining the different components. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts
[ https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346457#comment-15346457 ] ASF GitHub Bot commented on FLINK-4110: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2153 [FLINK-4110] Add testing skeleton to quickstart With this change, our quickstart archetypes will also contain some sample code for bringing up an embedded server for testing purposes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink4110 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2153 commit e887903d4efd01cff1dcea4dee279c927064a1a4 Author: Robert MetzgerDate: 2016-06-23T13:35:18Z [FLINK-4110] Add testing skeleton to quickstart > Provide testing skeleton in quickstarts > --- > > Key: FLINK-4110 > URL: https://issues.apache.org/jira/browse/FLINK-4110 > Project: Flink > Issue Type: Improvement > Components: Build System, Quickstarts >Affects Versions: 1.1.0 >Reporter: Robert Metzger > > Users often ask how they can test their Flink code. > I suggest to provide a very simple skeleton / example in the quickstarts, > showing how its done. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2153: [FLINK-4110] Add testing skeleton to quickstart
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2153 [FLINK-4110] Add testing skeleton to quickstart With this change, our quickstart archetypes will also contain some sample code for bringing up an embedded server for testing purposes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink4110 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2153 commit e887903d4efd01cff1dcea4dee279c927064a1a4 Author: Robert MetzgerDate: 2016-06-23T13:35:18Z [FLINK-4110] Add testing skeleton to quickstart --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...
GitHub user chobeat opened a pull request: https://github.com/apache/flink/pull/2152 [FLINK-3920] Distributed Linear Algebra: block-based matrix Second part of the distributed linear algebra contribution. This PR introduces block-partitioned matrices, operations on them (multiplication, per-row operations) and conversions from and to row-partitioned matrices. You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink FLINK-3920 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2152 commit c59125272cc97be21bfe303fe2bb0cdd70c81915 Author: chobeatDate: 2016-06-23T12:37:13Z BlockMatrix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix
[ https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346435#comment-15346435 ] ASF GitHub Bot commented on FLINK-3920: --- GitHub user chobeat opened a pull request: https://github.com/apache/flink/pull/2152 [FLINK-3920] Distributed Linear Algebra: block-based matrix Second part of the distributed linear algebra contribution. This PR introduces block-partitioned matrices, operations on them (multiplication, per-row operations) and conversions from and to row-partitioned matrices. You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink FLINK-3920 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2152 commit c59125272cc97be21bfe303fe2bb0cdd70c81915 Author: chobeatDate: 2016-06-23T12:37:13Z BlockMatrix > Distributed Linear Algebra: block-based matrix > -- > > Key: FLINK-3920 > URL: https://issues.apache.org/jira/browse/FLINK-3920 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Simone Robutti >Assignee: Simone Robutti > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2096 Will be merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2096 Thanks for your thorough review @uce. - We have talked offline concerning the status in the web interface and it turned out to be not a problem since the jobs are directly removed from `currentJobs` in the `JobManager` when `cancelAndClearEverything` is called. This implies that the jobs will no longer be shown in the web interface. - I agree that it is a good idea to replace the `SuppressRestartException` by a mechanism to disable the `RestartStrategies`. I've opened a [JIRA issue](https://issues.apache.org/jira/browse/FLINK-4112) to keep track of the effort. I think, however, that the mechanism should be implemented as part of the work on this issue. That way we won't mix pull requests with each other. - You're right. I've addressed the comment and added `SUSPENDED` to the set of states which don't throw an exception when encountered in `ExecutionGraph.restart`. - You're right concerning the `JobManagerProcess` tests. Maybe we can refactor some of them in the future by applying a similar pattern as it was used in `JobManagerHARecoveryTest`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346404#comment-15346404 ] ASF GitHub Bot commented on FLINK-3800: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2096 Will be merging this PR. > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346399#comment-15346399 ] ASF GitHub Bot commented on FLINK-3800: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2096 Thanks for your thorough review @uce. - We have talked offline concerning the status in the web interface and it turned out to be not a problem since the jobs are directly removed from `currentJobs` in the `JobManager` when `cancelAndClearEverything` is called. This implies that the jobs will no longer be shown in the web interface. - I agree that it is a good idea to replace the `SuppressRestartException` by a mechanism to disable the `RestartStrategies`. I've opened a [JIRA issue](https://issues.apache.org/jira/browse/FLINK-4112) to keep track of the effort. I think, however, that the mechanism should be implemented as part of the work on this issue. That way we won't mix pull requests with each other. - You're right. I've addressed the comment and added `SUSPENDED` to the set of states which don't throw an exception when encountered in `ExecutionGraph.restart`. - You're right concerning the `JobManagerProcess` tests. Maybe we can refactor some of them in the future by applying a similar pattern as it was used in `JobManagerHARecoveryTest`. > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4112) Replace SuppressRestartException with disabling RestartStrategy
Till Rohrmann created FLINK-4112: Summary: Replace SuppressRestartException with disabling RestartStrategy Key: FLINK-4112 URL: https://issues.apache.org/jira/browse/FLINK-4112 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.1.0 Reporter: Till Rohrmann Priority: Minor Fix For: 1.2.0 Currently, Flink uses {{SuppressRestartExceptions}} to suppress job restarts. This mechanism can be used by different components to control the lifecycle of a job. Since the control flow of exceptions can be quite difficult retrace, I propose to replace this mechanism by an explicit {{RestartStrategy}} disable method. So instead of throwing a {{SuppressRestartException}} we should disable the {{RestartStrategy}} to avoid further job restarts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3965) Delegating GraphAlgorithm
[ https://issues.apache.org/jira/browse/FLINK-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346395#comment-15346395 ] Vasia Kalavri commented on FLINK-3965: -- The functionality is much needed in my opinion. My concern is what we expose to users and what we keep internal to Gelly. If the {{DelegatingGraphAlgorithm}} and the {{GraphAnalytic}} are intended for users, then we should make their functionalities and differences very clear in the docs, including examples. Maybe that can be done as part of FLINK-4104? > Delegating GraphAlgorithm > - > > Key: FLINK-3965 > URL: https://issues.apache.org/jira/browse/FLINK-3965 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Complex and related algorithms often overlap in computation of data. Two such > examples are: > 1) the local and global clustering coefficients each use a listing of > triangles > 2) the local clustering coefficient joins on vertex degree, and the > underlying triangle listing annotates edge degree which uses vertex degree > We can reuse and rewrite algorithm output by creating a {{ProxyObject}} as a > delegate for method calls to the {{DataSet}} returned by the algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3647) Change StreamSource to use Processing-Time Clock Service
[ https://issues.apache.org/jira/browse/FLINK-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346364#comment-15346364 ] ASF GitHub Bot commented on FLINK-3647: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2124#discussion_r68227329 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java --- @@ -27,10 +27,12 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; /** - * A {@link Trigger} that continuously fires based on a given time interval. The time is the current - * system time. + * A {@link Trigger} that continuously fires based on a given time interval. The current (processing) + * time is provided by the {@link TimeServiceProvider} --- End diff -- I think this is an implementation detail that should not necessarily be mentioned here. > Change StreamSource to use Processing-Time Clock Service > > > Key: FLINK-3647 > URL: https://issues.apache.org/jira/browse/FLINK-3647 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > > Currently, the {{StreamSource.AutomaticWatermarkContext}} has it's own timer > service. This should be changed to use the Clock service introduced in > FLINK-3646 to make watermark emission testable by providing a custom Clock. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2124#discussion_r68227329 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java --- @@ -27,10 +27,12 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; /** - * A {@link Trigger} that continuously fires based on a given time interval. The time is the current - * system time. + * A {@link Trigger} that continuously fires based on a given time interval. The current (processing) + * time is provided by the {@link TimeServiceProvider} --- End diff -- I think this is an implementation detail that should not necessarily be mentioned here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3647) Change StreamSource to use Processing-Time Clock Service
[ https://issues.apache.org/jira/browse/FLINK-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346360#comment-15346360 ] ASF GitHub Bot commented on FLINK-3647: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2124#discussion_r68227049 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * A context provided to the {@link WindowAssigner} that allows it to query the + * current processing time. This is provided to the assigner by its containing + * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, + * which, in turn, gets it from the containing + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. + */ +public abstract class WindowAssignerContext { --- End diff -- This should be moved to `WindowAssigner`, similarly to how `TriggerContext` is an inner class of `Trigger`. > Change StreamSource to use Processing-Time Clock Service > > > Key: FLINK-3647 > URL: https://issues.apache.org/jira/browse/FLINK-3647 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > > Currently, the {{StreamSource.AutomaticWatermarkContext}} has it's own timer > service. This should be changed to use the Clock service introduced in > FLINK-3646 to make watermark emission testable by providing a custom Clock. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3647) Change StreamSource to use Processing-Time Clock Service
[ https://issues.apache.org/jira/browse/FLINK-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346356#comment-15346356 ] ASF GitHub Bot commented on FLINK-3647: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2124#discussion_r68226912 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java --- @@ -186,16 +199,19 @@ public DistributedCache getDistributedCache() { public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { throw new UnsupportedOperationException(); } - + + public long getCurrentProcessingTime() { + Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized."); + return timerService.getCurrentProcessingTime(); + } + @Override public ScheduledFuture registerTimer(final long time, final Triggerable target) { - if (timer == null) { - timer = Executors.newSingleThreadScheduledExecutor(); - } + Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized."); - final long delay = Math.max(time - System.currentTimeMillis(), 0); + final long delay = Math.max(time - timerService.getCurrentProcessingTime(), 0); --- End diff -- `timerService.registerTimer()` takes an absolute timestamp not a delay now. > Change StreamSource to use Processing-Time Clock Service > > > Key: FLINK-3647 > URL: https://issues.apache.org/jira/browse/FLINK-3647 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > > Currently, the {{StreamSource.AutomaticWatermarkContext}} has it's own timer > service. This should be changed to use the Clock service introduced in > FLINK-3646 to make watermark emission testable by providing a custom Clock. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2124#discussion_r68227049 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * A context provided to the {@link WindowAssigner} that allows it to query the + * current processing time. This is provided to the assigner by its containing + * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, + * which, in turn, gets it from the containing + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. + */ +public abstract class WindowAssignerContext { --- End diff -- This should be moved to `WindowAssigner`, similarly to how `TriggerContext` is an inner class of `Trigger`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2124#discussion_r68226912 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java --- @@ -186,16 +199,19 @@ public DistributedCache getDistributedCache() { public ReducingState getReducingState(ReducingStateDescriptor stateProperties) { throw new UnsupportedOperationException(); } - + + public long getCurrentProcessingTime() { + Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized."); + return timerService.getCurrentProcessingTime(); + } + @Override public ScheduledFuture registerTimer(final long time, final Triggerable target) { - if (timer == null) { - timer = Executors.newSingleThreadScheduledExecutor(); - } + Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized."); - final long delay = Math.max(time - System.currentTimeMillis(), 0); + final long delay = Math.max(time - timerService.getCurrentProcessingTime(), 0); --- End diff -- `timerService.registerTimer()` takes an absolute timestamp not a delay now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346340#comment-15346340 ] ASF GitHub Bot commented on FLINK-3974: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2110 @tillrohrmann I pushed a commit that removes the per-operator object-reuse setting, refactors broadcasting and direct outputs and changes the ITCase to a test case. Happy reviewing. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > .addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output .collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2110 @tillrohrmann I pushed a commit that removes the per-operator object-reuse setting, refactors broadcasting and direct outputs and changes the ITCase to a test case. Happy reviewing. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4046) Failing a restarting job can get stuck in JobStatus.FAILING
[ https://issues.apache.org/jira/browse/FLINK-4046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346325#comment-15346325 ] ASF GitHub Bot commented on FLINK-4046: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2095 Agreed. I had the same thought regarding checking for any final state right after posting this. ;) +1 to merge. > Failing a restarting job can get stuck in JobStatus.FAILING > --- > > Key: FLINK-4046 > URL: https://issues.apache.org/jira/browse/FLINK-4046 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > When a job is in state {{RESTARTING}}, then it can happen that all of its > {{ExecutionJobVertices}} are in a final state (if they have not been reset). > When calling {{fail}} on this {{ExecutionGraph}} will transition the state to > {{FAILING}} and call cancel on all {{ExecutionJobVertices}}. The job state > {{FAILING}} can only be left iff all {{ExecutionJobVertices}} have reached a > final state. The notification of this final state is only sent to the > {{ExecutionGraph}} when all subtasks of an {{ExecutionJobVertex}} have > transitioned to a final state. However, this won't happen because the > {{ExeuctionJobVertices}} are already in a final state. The result is that a > job can get stuck in the state {{FAILING}} if {{fail}} is called on a > {{RESTARTING}} job. > I propose to add a direct transition from {{RESTARTING}} to {{FAILED}} as it > is the case for the {{cancel}} call (transition from {{RESTARTING}} to > {{CANCELED}}). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2095: [FLINK-4046] [runtime] Add direct state transition from R...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2095 Agreed. I had the same thought regarding checking for any final state right after posting this. ;) +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4111) Flink Table & SQL doesn't work in very simple example
Jark Wu created FLINK-4111: -- Summary: Flink Table & SQL doesn't work in very simple example Key: FLINK-4111 URL: https://issues.apache.org/jira/browse/FLINK-4111 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.1.0 Reporter: Jark Wu Fix For: 1.1.0 I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table API and SQL in my project. But when I run the very simple example WordCountTable, I encountered the following exception : {code} Exception in thread "main" java.lang.NoSuchMethodError: org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList; at org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41) at com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43) at com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala) {code} It seems that something wrong with our guava shade. Do you have any ideas? My pom file and WordCountTable.scala are [here|https://gist.github.com/wuchong/9c1c0df3cb7453502abc4605f5347289]. And I found someone have the same problem on stack overflow [http://stackoverflow.com/questions/37835408/org-apache-flink-api-table-tableexception-alias-on-field-reference-expression-e#comment63160086_37838816] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346322#comment-15346322 ] ASF GitHub Bot commented on FLINK-3800: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2096#discussion_r68222088 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -1029,21 +1061,25 @@ else if (current == JobStatus.CANCELLING) { } } else if (current == JobStatus.FAILING) { - if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { - // double check in case that in the meantime a SuppressRestartsException was thrown - if (restartStrategy.canRestart()) { - restartStrategy.restart(this); - break; - } else { - fail(new Exception("ExecutionGraph went into RESTARTING state but " + - "then the restart strategy was disabled.")); - } - - } else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) { + boolean allowRestart = !(failureCause instanceof SuppressRestartsException); + + if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { + restartStrategy.restart(this); + break; + } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } } + else if (current == JobStatus.SUSPENDED) { + // we've already cleaned up when entering the SUSPENDED state + break; + } + else if (current.isGloballyTerminalState()) { + LOG.warn("Job has entered globally terminal state without waiting for all " + + "job vertices to reach final state."); + break; + } else { fail(new Exception("ExecutionGraph went into final state from state " + current)); --- End diff -- Yes you're right. The `break` is missing here. Will add it. > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2096#discussion_r68222088 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -1029,21 +1061,25 @@ else if (current == JobStatus.CANCELLING) { } } else if (current == JobStatus.FAILING) { - if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { - // double check in case that in the meantime a SuppressRestartsException was thrown - if (restartStrategy.canRestart()) { - restartStrategy.restart(this); - break; - } else { - fail(new Exception("ExecutionGraph went into RESTARTING state but " + - "then the restart strategy was disabled.")); - } - - } else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) { + boolean allowRestart = !(failureCause instanceof SuppressRestartsException); + + if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { + restartStrategy.restart(this); + break; + } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } } + else if (current == JobStatus.SUSPENDED) { + // we've already cleaned up when entering the SUSPENDED state + break; + } + else if (current.isGloballyTerminalState()) { + LOG.warn("Job has entered globally terminal state without waiting for all " + + "job vertices to reach final state."); + break; + } else { fail(new Exception("ExecutionGraph went into final state from state " + current)); --- End diff -- Yes you're right. The `break` is missing here. Will add it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346319#comment-15346319 ] ASF GitHub Bot commented on FLINK-3800: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2096#discussion_r68221848 --- Diff: flink-runtime/src/test/resources/log4j-test.properties --- @@ -16,7 +16,7 @@ # limitations under the License. -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console --- End diff -- Yes, of course. Will fix it. > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans
[ https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346318#comment-15346318 ] ASF GitHub Bot commented on FLINK-3800: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2096#discussion_r68221836 --- Diff: docs/internals/job_scheduling.md --- @@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run -During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the +Each ExecutionGraph has a job status associated with it. +This job status indicates the current state of the job execution. + +A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*. +In case of failures, a job switches first to *failing* where it cancels all running tasks. +If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*. +If the job can be restarted, then it will enter the *restarting* state. +Once the job has been completely restarted, it will reach the *created* state. + +In case that the user cancels the job, it will go into the *cancelling* state. +This is also entails the cancellation of all currently running tasks. --- End diff -- Thanks for spotting :-) > ExecutionGraphs can become orphans > -- > > Key: FLINK-3800 > URL: https://issues.apache.org/jira/browse/FLINK-3800 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.0.0, 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{JobManager.cancelAndClearEverything}} method fails all currently > executed jobs on the {{JobManager}} and then clears the list of > {{currentJobs}} kept in the JobManager. This can become problematic if the > user has set a restart strategy for a job, because the {{RestartStrategy}} > will try to restart the job. This can lead to unwanted re-deployments of the > job which consumes resources and thus will trouble the execution of other > jobs. If the restart strategy never stops, then this prevents that the > {{ExecutionGraph}} from ever being properly terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2096#discussion_r68221836 --- Diff: docs/internals/job_scheduling.md --- @@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains the {% gh_link /flink-run -During its execution, each parallel task goes through multiple stages, from *created* to *finished* or *failed*. The diagram below illustrates the +Each ExecutionGraph has a job status associated with it. +This job status indicates the current state of the job execution. + +A Flink job is first in the *created* state, then switches to *running* and upon completion of all work it switches to *finished*. +In case of failures, a job switches first to *failing* where it cancels all running tasks. +If all job vertices have reached a final state and the job is not restartable, then the job transitions to *failed*. +If the job can be restarted, then it will enter the *restarting* state. +Once the job has been completely restarted, it will reach the *created* state. + +In case that the user cancels the job, it will go into the *cancelling* state. +This is also entails the cancellation of all currently running tasks. --- End diff -- Thanks for spotting :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2096#discussion_r68221848 --- Diff: flink-runtime/src/test/resources/log4j-test.properties --- @@ -16,7 +16,7 @@ # limitations under the License. -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console --- End diff -- Yes, of course. Will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2095: [FLINK-4046] [runtime] Add direct state transition...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2095 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4046) Failing a restarting job can get stuck in JobStatus.FAILING
[ https://issues.apache.org/jira/browse/FLINK-4046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346317#comment-15346317 ] ASF GitHub Bot commented on FLINK-4046: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2095 > Failing a restarting job can get stuck in JobStatus.FAILING > --- > > Key: FLINK-4046 > URL: https://issues.apache.org/jira/browse/FLINK-4046 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > When a job is in state {{RESTARTING}}, then it can happen that all of its > {{ExecutionJobVertices}} are in a final state (if they have not been reset). > When calling {{fail}} on this {{ExecutionGraph}} will transition the state to > {{FAILING}} and call cancel on all {{ExecutionJobVertices}}. The job state > {{FAILING}} can only be left iff all {{ExecutionJobVertices}} have reached a > final state. The notification of this final state is only sent to the > {{ExecutionGraph}} when all subtasks of an {{ExecutionJobVertex}} have > transitioned to a final state. However, this won't happen because the > {{ExeuctionJobVertices}} are already in a final state. The result is that a > job can get stuck in the state {{FAILING}} if {{fail}} is called on a > {{RESTARTING}} job. > I propose to add a direct transition from {{RESTARTING}} to {{FAILED}} as it > is the case for the {{cancel}} call (transition from {{RESTARTING}} to > {{CANCELED}}). -- This message was sent by Atlassian JIRA (v6.3.4#6332)