[jira] [Commented] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Geoffrey Mon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347577#comment-15347577
 ] 

Geoffrey Mon commented on FLINK-4098:
-

Thanks for the {{sys.stdout.flush()}} tip. After adding more debug statements, 
I found out that the real issue is in the Iterator, when next() is called, the 
function starts but hangs waiting for data to be sent so that it can be read.

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4109) Change the name of ternary condition operator 'eval' to '?'

2016-06-23 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347564#comment-15347564
 ] 

Jark Wu commented on FLINK-4109:


[~vkalavri] [~chengxiang li]  [~rmetzger] what do you think ? 

> Change the name of ternary condition operator  'eval' to  '?'  
> ---
>
> Key: FLINK-4109
> URL: https://issues.apache.org/jira/browse/FLINK-4109
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Jark Wu
>Assignee: Jark Wu
> Fix For: 1.1.0
>
>
> The ternary condition operator in Table API is named {{eval}}, for example: 
> {{(42 > 5).eval("A", "B")}} leads to "A".  IMO, the eval function is not well 
> understood. Instead the "?" is a better choice I think, which is used in Java 
> for condition operator. 
> It will be clearer and more literal understood, e.g.
> {{(42 > 5).?("A", "B")}} or {{(42 > 5) ? ("A", "B")}}
> If it make sense, I will pull a request.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2157: [FLINK-4115] FsStateBackend filesystem verificatio...

2016-06-23 Thread jfg9
GitHub user jfg9 opened a pull request:

https://github.com/apache/flink/pull/2157

[FLINK-4115] FsStateBackend filesystem verification can cause classpath 
exceptions

There are two changes to FsStateBackend to avoid classpath exceptions when 
submitting a job from a Flink client which does not have the necessary file 
system classes on its classpath:

- The filesystem is no longer initialised in the FsStateBackend constructor 
- The verification checks for the checkpoint directory in the 
FsStateBackend constructor are skipped if Flink does not have built-in support 
for the URI scheme

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jfg9/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2157.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2157


commit e8557895b4ff8e40a0705152ec65d355fe7d5f65
Author: Josh 
Date:   2016-06-23T21:18:02Z

[FLINK-4115] Skip filesystem checks for filesystems with no built-in support

commit 94ec8dca6c016c1c500ade90b4c823e5faaecaf3
Author: Josh 
Date:   2016-06-23T21:22:40Z

[FLINK-4115] Remove filesystem initialisation from FsStateBackend 
constructor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347390#comment-15347390
 ] 

ASF GitHub Bot commented on FLINK-4115:
---

GitHub user jfg9 opened a pull request:

https://github.com/apache/flink/pull/2157

[FLINK-4115] FsStateBackend filesystem verification can cause classpath 
exceptions

There are two changes to FsStateBackend to avoid classpath exceptions when 
submitting a job from a Flink client which does not have the necessary file 
system classes on its classpath:

- The filesystem is no longer initialised in the FsStateBackend constructor 
- The verification checks for the checkpoint directory in the 
FsStateBackend constructor are skipped if Flink does not have built-in support 
for the URI scheme

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jfg9/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2157.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2157


commit e8557895b4ff8e40a0705152ec65d355fe7d5f65
Author: Josh 
Date:   2016-06-23T21:18:02Z

[FLINK-4115] Skip filesystem checks for filesystems with no built-in support

commit 94ec8dca6c016c1c500ade90b4c823e5faaecaf3
Author: Josh 
Date:   2016-06-23T21:22:40Z

[FLINK-4115] Remove filesystem initialisation from FsStateBackend 
constructor




> FsStateBackend filesystem verification can cause classpath exceptions
> -
>
> Key: FLINK-4115
> URL: https://issues.apache.org/jira/browse/FLINK-4115
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0
>Reporter: Josh Forman-Gornall
>Priority: Minor
>
> In the constructor of FsStateBackend, the FileSystem for the checkpoint 
> directory is initialised and it is verified that the checkpoint path exists. 
> This verification happens in the Flink client program when submitting a job 
> and can cause classpath issues if classes required to access the file system 
> are not available in the client's classpath.
> For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend 
> and an s3:// checkpoint directory, we get the below ClassNotFoundException. 
> This is because the jars needed to use the EMR file system are available only 
> in the YARN context and not when submitting the job via the Flink client.
> {noformat}
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>   at 
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>   at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
>   at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
>   at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics

2016-06-23 Thread vishnu viswanath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347222#comment-15347222
 ] 

vishnu viswanath commented on FLINK-3962:
-

I am getting this exception in Local execution aswell. (from IDE or when run 
through `flink run`)

{code}
17:16:38,095 ERROR org.apache.flink.metrics.reporter.JMXReporter
 - A metric with the name 
org.apache.flink.metrics:key0=localhost,key1=taskmanager,key2=f7bccd0c64bc9c3643647913eced7104,key3=Streaming_Job,key4=Map,key5=0,name=numRecordsIn
 was already registered.
javax.management.InstanceAlreadyExistsException: 
org.apache.flink.metrics:key0=localhost,key1=taskmanager,key2=f7bccd0c64bc9c3643647913eced7104,key3=Streaming_Job,key4=Map,key5=0,name=numRecordsIn
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:109)
at 
org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:194)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:150)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:141)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:235)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
at java.lang.Thread.run(Thread.java:745)
{code}

> JMXReporter doesn't properly register/deregister metrics
> 
>
> Key: FLINK-3962
> URL: https://issues.apache.org/jira/browse/FLINK-3962
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> The following fails our Yarn tests because it checks for errors in the 
> jobmanager/taskmanager logs:
> {noformat}
> 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> 

[jira] [Updated] (FLINK-3898) Adamic-Adar Similarity

2016-06-23 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3898:
--
Fix Version/s: 1.1.0

> Adamic-Adar Similarity
> --
>
> Key: FLINK-3898
> URL: https://issues.apache.org/jira/browse/FLINK-3898
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> The implementation of Adamic-Adar Similarity [0] is very close to Jaccard 
> Similarity. Whereas Jaccard Similarity counts common neighbors, Adamic-Adar 
> Similarity sums the inverse logarithm of the degree of common neighbors.
> Consideration will be given to the computation of the inverse logarithm, in 
> particular whether to pre-compute a small array of values.
> [0] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions

2016-06-23 Thread Josh Forman-Gornall (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Forman-Gornall updated FLINK-4115:
---
Component/s: (was: Core)
 Local Runtime

> FsStateBackend filesystem verification can cause classpath exceptions
> -
>
> Key: FLINK-4115
> URL: https://issues.apache.org/jira/browse/FLINK-4115
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0
>Reporter: Josh Forman-Gornall
>Priority: Minor
>
> In the constructor of FsStateBackend, the FileSystem for the checkpoint 
> directory is initialised and it is verified that the checkpoint path exists. 
> This verification happens in the Flink client program when submitting a job 
> and can cause classpath issues if classes required to access the file system 
> are not available in the client's classpath.
> For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend 
> and an s3:// checkpoint directory, we get the below ClassNotFoundException. 
> This is because the jars needed to use the EMR file system are available only 
> in the YARN context and not when submitting the job via the Flink client.
> {noformat}
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>   at 
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>   at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
>   at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
>   at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions

2016-06-23 Thread Josh Forman-Gornall (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Forman-Gornall updated FLINK-4115:
---
Description: 
In the constructor of FsStateBackend, the FileSystem for the checkpoint 
directory is initialised and it is verified that the checkpoint path exists. 
This verification happens in the Flink client program when submitting a job and 
can cause classpath issues if classes required to access the file system are 
not available in the client's classpath.

For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend and 
an s3:// checkpoint directory, we get the below ClassNotFoundException. This is 
because the jars needed to use the EMR file system are available only in the 
YARN context and not when submitting the job via the Flink client.

{noformat}
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at 
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
{noformat}

  was:
In the constructor of FsStateBackend, the FileSystem for the checkpoint 
directory is initialised and it is verified that the checkpoint path exists. 
This verification happens in the Flink client program when submitting a job and 
can cause classpath issues if classes required to access the file system are 
not available in the client's classpath.

For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend and 
an s3:// checkpoint directory, we get the below ClassNotFoundException. This is 
because the jars needed to use the EMR file system are available only in the 
YARN context and not when submitting the job via the Flink client.

```
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at 
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
```


> FsStateBackend filesystem verification can cause classpath exceptions
> -
>
> Key: FLINK-4115
> URL: https://issues.apache.org/jira/browse/FLINK-4115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Josh Forman-Gornall
>Priority: Minor
>
> In the constructor of FsStateBackend, the FileSystem for the checkpoint 
> directory is initialised and it is verified that the checkpoint path exists. 
> This verification happens in the Flink client program when submitting a job 
> and can cause classpath issues if classes required to access the file system 
> are not available in the client's classpath.
> For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend 
> and an s3:// checkpoint directory, we get the below ClassNotFoundException. 
> This is because the jars needed to use the EMR file system are available only 
> in the YARN context and not when submitting the job via the Flink client.
> {noformat}
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>   at 
> 

[jira] [Created] (FLINK-4115) FsStateBackend filesystem verification can cause classpath exceptions

2016-06-23 Thread Josh Forman-Gornall (JIRA)
Josh Forman-Gornall created FLINK-4115:
--

 Summary: FsStateBackend filesystem verification can cause 
classpath exceptions
 Key: FLINK-4115
 URL: https://issues.apache.org/jira/browse/FLINK-4115
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.1.0
Reporter: Josh Forman-Gornall
Priority: Minor


In the constructor of FsStateBackend, the FileSystem for the checkpoint 
directory is initialised and it is verified that the checkpoint path exists. 
This verification happens in the Flink client program when submitting a job and 
can cause classpath issues if classes required to access the file system are 
not available in the client's classpath.

For example, if we run Flink on YARN over AWS EMR using RocksDBStateBackend and 
an s3:// checkpoint directory, we get the below ClassNotFoundException. This is 
because the jars needed to use the EMR file system are available only in the 
YARN context and not when submitting the job via the Flink client.

```
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
at 
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:175)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.(FsStateBackend.java:144)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.(RocksDBStateBackend.java:205)
```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4114) Need a way to manage multiple named, long-lived jobs on a single YARN cluster in an automated manner

2016-06-23 Thread Chris Hogue (JIRA)
Chris Hogue created FLINK-4114:
--

 Summary: Need a way to manage multiple named, long-lived jobs on a 
single YARN cluster in an automated manner
 Key: FLINK-4114
 URL: https://issues.apache.org/jira/browse/FLINK-4114
 Project: Flink
  Issue Type: Bug
Reporter: Chris Hogue


We are running several Flink jobs on a single YARN cluster. Currently each 
Flink job is run in its own YARN session (and thus its own YARN application 
ID). The difficulty comes in that we want to manage each of these jobs 
individually by name. For example we want to start, stop, update one job 
without affecting others. The primary access to these jobs is via the YARN 
application ID, which is not meaningful to discern which flink job it is 
running.

It would be nice if we had tools that would allow us to manage the flink jobs 
by name and have it do the right thing with the YARN session. Today we can use 
'flink run' and have it start a YARN session for that job, but from that point 
forward we have only the YARN application ID to work with.

As a concrete example suppose we have 2 jobs with names JobA and JobB. We'd 
want a way to so something like: 
flink run ; flink run 

We'd then want to be able to call:

flink cancel JobA

The cancel command would spin down the YARN session for JobA in addition to the 
flink job, leaving JobB running as normal. I've simplified the commands leaving 
out other options for illustrative purposes. And we'll want to be able to use 
savepoints through these steps as well.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4113) Always copy first value in ChainedAllReduceDriver

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15347023#comment-15347023
 ] 

ASF GitHub Bot commented on FLINK-4113:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2156

[FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4113_always_copy_first_value_in_chainedallreducedriver

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2156


commit f5f162b591b38e890758dc65fee93749ac86a347
Author: Greg Hogan 
Date:   2016-06-23T16:37:37Z

[FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver




> Always copy first value in ChainedAllReduceDriver
> -
>
> Key: FLINK-4113
> URL: https://issues.apache.org/jira/browse/FLINK-4113
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0, 1.0.4
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.1.0, 1.0.4
>
>
> {{ChainedAllReduceDriver.collect}} must copy the first record even when 
> object reuse is enabled or {{base}} may later point to the same object as 
> {{record}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2156: [FLINK-4113] [runtime] Always copy first value in ...

2016-06-23 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2156

[FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 
4113_always_copy_first_value_in_chainedallreducedriver

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2156


commit f5f162b591b38e890758dc65fee93749ac86a347
Author: Greg Hogan 
Date:   2016-06-23T16:37:37Z

[FLINK-4113] [runtime] Always copy first value in ChainedAllReduceDriver




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3879) Native implementation of HITS algorithm

2016-06-23 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346899#comment-15346899
 ] 

Greg Hogan commented on FLINK-3879:
---

[~vkalavri] here are the timings I get on an AWS c4.2xlarge using FLINK-3879 
rebased to master and merged with pr1517 (hash-based combine). Each execution 
is performing 10 iterations.

FLINK-3879 (HITS):

$ for i in `seq 10 2 20` ; do echo ; echo $i ; ./bin/flink run -q -class 
org.apache.flink.graph.examples.HITS 
~/flink-gelly-examples_2.10-1.1-SNAPSHOT.jar --input rmat --scale $i --output 
hash --algorithm HITS ; done

Scale 10:
ChecksumHashCode 0x018d7c49ca4a, count 902
Execution runtime: 1,034 ms

Scale 12:
ChecksumHashCode 0x058a6efc82d1, count 3349
Execution runtime: 1,115 ms

Scale 14:
ChecksumHashCode 0x15030ecf3188, count 12472
Execution runtime: 1,974 ms

Scale 16:
ChecksumHashCode 0x4f23492849eb, count 46826
Execution runtime: 5,843 ms

Scale 18:
ChecksumHashCode 0x0001267c806b8338, count 174010
Execution runtime: 21,927 ms

Scale 20:
ChecksumHashCode 0x000449bd0da45343, count 646203
Execution runtime: 93,488 ms

FLINK-2044 (HITSAlgorithm):

$ for i in `seq 10 2 20` ; do echo ; echo $i ; ./bin/flink run -q -class 
org.apache.flink.graph.examples.HITS 
~/flink-gelly-examples_2.10-1.1-SNAPSHOT.jar --input rmat --scale $i --output 
hash --algorithm HITSAlgorithm ; done

Scale 10:
ChecksumHashCode 0x01c88b0818f0, count 902
Execution runtime: 761 ms

Scale 12:
Cluster retrieved
ChecksumHashCode 0x069bcad3b322, count 3349
Execution runtime: 1,094 ms

Scale 14:
ChecksumHashCode 0x186c50950fea, count 12472
Execution runtime: 2,290 ms

Scale 16:
ChecksumHashCode 0x5b741faf30eb, count 46826
Execution runtime: 6,898 ms

Scale 18:
ChecksumHashCode 0x000153e520a8306c, count 174010
Execution runtime: 28,015 ms

Scale 20:
ChecksumHashCode 0x0004ed44e75c493a, count 646203
Execution runtime: 120,736 ms

> Native implementation of HITS algorithm
> ---
>
> Key: FLINK-3879
> URL: https://issues.apache.org/jira/browse/FLINK-3879
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Hyperlink-Induced Topic Search (HITS, also "hubs and authorities") is 
> presented in [0] and described in [1].
> "[HITS] is a very popular and effective algorithm to rank documents based on 
> the link information among a set of documents. The algorithm presumes that a 
> good hub is a document that points to many others, and a good authority is a 
> document that many documents point to." 
> [https://pdfs.semanticscholar.org/a8d7/c7a4c53a9102c4239356f9072ec62ca5e62f.pdf]
> This implementation differs from FLINK-2044 by providing for convergence, 
> outputting both hub and authority scores, and completing in half the number 
> of iterations.
> [0] http://www.cs.cornell.edu/home/kleinber/auth.pdf
> [1] https://en.wikipedia.org/wiki/HITS_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346881#comment-15346881
 ] 

Chesnay Schepler commented on FLINK-4098:
-

At that point it is completely normal for the java side to wait. Under usual 
circumstances the operator would be configured, read the broadcastVariables 
(which java writes without waiting for any acknowledgement), and then 
explicitly request data from java (until which java blocks).

So let's see. We know the python process is created, which is good. We also 
know that the function is created, but never run. This means that something 
went wrong with the setup of the operator, which reduces our area of failure to 
the else block in Environment.execute(). I would (and will over the weekend :) 
) plaster that stuff in debug statements.

Also, remember to call sys.stdout.flush() after print! This small detail has 
caused me quite some trouble :)



> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346830#comment-15346830
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger tagging just to make sure you're notified of this PR :)
When will you be free to review? Just for my own time allocation for when 
to continue working on the Kinesis connector. If there's anything majorly wrong 
with the implementation explained in the above comment, please let me know and 
I'll try to address them before effort on a detailed review.
Thanks in advance!


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2131: [FLINK-3231][streaming-connectors] FlinkKinesisConsumer r...

2016-06-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger tagging just to make sure you're notified of this PR :)
When will you be free to review? Just for my own time allocation for when 
to continue working on the Kinesis connector. If there's anything majorly wrong 
with the implementation explained in the above comment, please let me know and 
I'll try to address them before effort on a detailed review.
Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346794#comment-15346794
 ] 

ASF GitHub Bot commented on FLINK-4110:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2153
  
This will be very helpful! LGTM.


> Provide testing skeleton in quickstarts
> ---
>
> Key: FLINK-4110
> URL: https://issues.apache.org/jira/browse/FLINK-4110
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Quickstarts
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>
> Users often ask how they can test their Flink code.
> I suggest to provide a very simple skeleton / example in the quickstarts, 
> showing how its done.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346793#comment-15346793
 ] 

ASF GitHub Bot commented on FLINK-4110:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2153#discussion_r68276459
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Skeleton class showing how to write integration tests against a running 
embedded Flink
+ * testing cluster.
+ */
+public class TestSkeleton {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(TestSkeleton.class);
+   private static ForkableFlinkMiniCluster flink;
+   private static int flinkPort;
+
+   /**
+* Start a re-usable Flink mini cluster
+*/
+   @BeforeClass
+   public static void setupFlink() {
+   Configuration flinkConfig = new Configuration();
+   
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+   
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
--- End diff --

Is this config related to any of the demonstrated tests? If not, perhaps it 
can be removed.


> Provide testing skeleton in quickstarts
> ---
>
> Key: FLINK-4110
> URL: https://issues.apache.org/jira/browse/FLINK-4110
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Quickstarts
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>
> Users often ask how they can test their Flink code.
> I suggest to provide a very simple skeleton / example in the quickstarts, 
> showing how its done.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2153: [FLINK-4110] Add testing skeleton to quickstart

2016-06-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2153
  
This will be very helpful! LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2153: [FLINK-4110] Add testing skeleton to quickstart

2016-06-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2153#discussion_r68276459
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Skeleton class showing how to write integration tests against a running 
embedded Flink
+ * testing cluster.
+ */
+public class TestSkeleton {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(TestSkeleton.class);
+   private static ForkableFlinkMiniCluster flink;
+   private static int flinkPort;
+
+   /**
+* Start a re-usable Flink mini cluster
+*/
+   @BeforeClass
+   public static void setupFlink() {
+   Configuration flinkConfig = new Configuration();
+   
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+   
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
--- End diff --

Is this config related to any of the demonstrated tests? If not, perhaps it 
can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346750#comment-15346750
 ] 

ASF GitHub Bot commented on FLINK-4110:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2153#discussion_r68272113
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Skeleton class showing how to write integration tests against a running 
embedded Flink
+ * testing cluster.
+ */
+public class TestSkeleton {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(TestSkeleton.class);
+   private static ForkableFlinkMiniCluster flink;
+   private static int flinkPort;
+
+   /**
+* Start a re-usable Flink mini cluster
+*/
+   @BeforeClass
+   public static void setupFlink() {
+   Configuration flinkConfig = new Configuration();
+   
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+   
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+   flink.start();
+
+   flinkPort = flink.getLeaderRPCPort();
+   LOG.info("Started a Flink testing cluster on port {}", 
flinkPort);
+   }
+
+
+   /**
+* This test uses a finite stream. The TestingSource will stop after 
1000 elements have been send.
--- End diff --

send --> sent


> Provide testing skeleton in quickstarts
> ---
>
> Key: FLINK-4110
> URL: https://issues.apache.org/jira/browse/FLINK-4110
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Quickstarts
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>
> Users often ask how they can test their Flink code.
> I suggest to provide a very simple skeleton / example in the quickstarts, 
> showing how its done.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2153: [FLINK-4110] Add testing skeleton to quickstart

2016-06-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2153#discussion_r68272113
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/test/java/TestSkeleton.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package};
+
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestUtils;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Skeleton class showing how to write integration tests against a running 
embedded Flink
+ * testing cluster.
+ */
+public class TestSkeleton {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(TestSkeleton.class);
+   private static ForkableFlinkMiniCluster flink;
+   private static int flinkPort;
+
+   /**
+* Start a re-usable Flink mini cluster
+*/
+   @BeforeClass
+   public static void setupFlink() {
+   Configuration flinkConfig = new Configuration();
+   
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+   
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+   
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+   flink = new ForkableFlinkMiniCluster(flinkConfig, false);
+   flink.start();
+
+   flinkPort = flink.getLeaderRPCPort();
+   LOG.info("Started a Flink testing cluster on port {}", 
flinkPort);
+   }
+
+
+   /**
+* This test uses a finite stream. The TestingSource will stop after 
1000 elements have been send.
--- End diff --

send --> sent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2147: [FLINK-1946] reduce verbosity of Yarn cluster setup

2016-06-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2147
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1946) Make yarn tests logging less verbose

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346735#comment-15346735
 ] 

ASF GitHub Bot commented on FLINK-1946:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2147
  
+1


> Make yarn tests logging less verbose
> 
>
> Key: FLINK-1946
> URL: https://issues.apache.org/jira/browse/FLINK-1946
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN Client
>Reporter: Till Rohrmann
>Priority: Minor
>
> Currently, the yarn tests log on the INFO level making the test outputs 
> confusing. Furthermore some status messages are written to stdout. I think 
> these messages are not necessary to be shown to the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2147: [FLINK-1946] reduce verbosity of Yarn cluster setu...

2016-06-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2147#discussion_r68269922
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -750,13 +751,16 @@ protected YarnClusterClient deployInternal() throws 
Exception {
LOG.info("YARN application has been 
deployed successfully.");
break loop;
default:
-   LOG.info("Deploying cluster, current 
state " + appState);
-   if(waittime > 6) {
+   if (appState != lastAppState) {
+   LOG.info("Deploying cluster, 
current state " + appState);
+   }
+   if(waitTime > 6) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1946) Make yarn tests logging less verbose

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346726#comment-15346726
 ] 

ASF GitHub Bot commented on FLINK-1946:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2147#discussion_r68269922
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -750,13 +751,16 @@ protected YarnClusterClient deployInternal() throws 
Exception {
LOG.info("YARN application has been 
deployed successfully.");
break loop;
default:
-   LOG.info("Deploying cluster, current 
state " + appState);
-   if(waittime > 6) {
+   if (appState != lastAppState) {
+   LOG.info("Deploying cluster, 
current state " + appState);
+   }
+   if(waitTime > 6) {
--- End diff --

missing space after if


> Make yarn tests logging less verbose
> 
>
> Key: FLINK-1946
> URL: https://issues.apache.org/jira/browse/FLINK-1946
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN Client
>Reporter: Till Rohrmann
>Priority: Minor
>
> Currently, the yarn tests log on the INFO level making the test outputs 
> confusing. Furthermore some status messages are written to stdout. I think 
> these messages are not necessary to be shown to the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2151: [hotfix][docs] Add warning to Cassandra documentation

2016-06-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2151
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Geoffrey Mon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346713#comment-15346713
 ] 

Geoffrey Mon commented on FLINK-4098:
-

After adding some more print statements in different places, I found that while 
running a normal {{.map()}} on a DataSet caused {{MapFunction._run()}}, 
{{MapFunction._configure()}}, and {{MapFunction.collect()}} to run, running 
{{.map()}} as an iteration caused none of these functions to run. The 
MapFunction instance was initialized but never used. Does this mean the Java 
side of Flink was waiting for the Python side to return its results, causing 
the hang?

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4113) Always copy first value in ChainedAllReduceDriver

2016-06-23 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4113:
-

 Summary: Always copy first value in ChainedAllReduceDriver
 Key: FLINK-4113
 URL: https://issues.apache.org/jira/browse/FLINK-4113
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.1.0, 1.0.4
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Critical
 Fix For: 1.1.0, 1.0.4


{{ChainedAllReduceDriver.collect}} must copy the first record even when object 
reuse is enabled or {{base}} may later point to the same object as {{record}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4093) Expose metric interfaces

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346712#comment-15346712
 ] 

ASF GitHub Bot commented on FLINK-4093:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68268650
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

we should change it for the other methods as well.


> Expose metric interfaces
> 
>
> Key: FLINK-4093
> URL: https://issues.apache.org/jira/browse/FLINK-4093
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces

2016-06-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68268650
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

we should change it for the other methods as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4093) Expose metric interfaces

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346686#comment-15346686
 ] 

ASF GitHub Bot commented on FLINK-4093:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68265180
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

Ah true. Good idea :-) Maybe we should change it for the `gauge` and 
`histogram` functions as well.


> Expose metric interfaces
> 
>
> Key: FLINK-4093
> URL: https://issues.apache.org/jira/browse/FLINK-4093
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68265180
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

Ah true. Good idea :-) Maybe we should change it for the `gauge` and 
`histogram` functions as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3800) ExecutionGraphs can become orphans

2016-06-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-3800.

Resolution: Fixed

Fixed via 6420c1c264ed3ce0c32ba164c2cdb85ccdccf265

> ExecutionGraphs can become orphans
> --
>
> Key: FLINK-3800
> URL: https://issues.apache.org/jira/browse/FLINK-3800
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{JobManager.cancelAndClearEverything}} method fails all currently 
> executed jobs on the {{JobManager}} and then clears the list of 
> {{currentJobs}} kept in the JobManager. This can become problematic if the 
> user has set a restart strategy for a job, because the {{RestartStrategy}} 
> will try to restart the job. This can lead to unwanted re-deployments of the 
> job which consumes resources and thus will trouble the execution of other 
> jobs. If the restart strategy never stops, then this prevents that the 
> {{ExecutionGraph}} from ever being properly terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

2016-06-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2096


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346669#comment-15346669
 ] 

ASF GitHub Bot commented on FLINK-3800:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2096


> ExecutionGraphs can become orphans
> --
>
> Key: FLINK-3800
> URL: https://issues.apache.org/jira/browse/FLINK-3800
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{JobManager.cancelAndClearEverything}} method fails all currently 
> executed jobs on the {{JobManager}} and then clears the list of 
> {{currentJobs}} kept in the JobManager. This can become problematic if the 
> user has set a restart strategy for a job, because the {{RestartStrategy}} 
> will try to restart the job. This can lead to unwanted re-deployments of the 
> job which consumes resources and thus will trouble the execution of other 
> jobs. If the restart strategy never stops, then this prevents that the 
> {{ExecutionGraph}} from ever being properly terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Geoffrey Mon (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Geoffrey Mon updated FLINK-4098:

Comment: was deleted

(was: Maybe it would be simpler to implement iterations in Python similar to 
aggregations?)

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Geoffrey Mon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346664#comment-15346664
 ] 

Geoffrey Mon commented on FLINK-4098:
-

Maybe it would be simpler to implement iterations in Python similar to 
aggregations?

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug

2016-06-23 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4061:
--
Priority: Major  (was: Blocker)

>  about flink jdbc connect oracle db exists a crital bug
> ---
>
> Key: FLINK-4061
> URL: https://issues.apache.org/jira/browse/FLINK-4061
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API
>Affects Versions: 1.1.0
> Environment: ubuntu ,jdk1.8.0  ,Start a Local Flink Cluster
>Reporter: dengchangfu
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I use flink-jdbc to connect oracle db for etl, so i write a demo to test the 
> feature. the code is simple,but after I submit this app ,a exception happen.
> exception info like this:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> my code like this:
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
> import org.apache.flink.api.table.Row;
> import org.apache.flink.api.table.typeutils.RowTypeInfo;
> import 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
> import java.sql.ResultSet;
> import java.sql.Types;
> /**
>  * Skeleton for a Flink Job.
>  *
>  * For a full example of a Flink Job, see the WordCountJob.java file in the
>  * same package/directory or have a look at the website.
>  *
>  * You can also generate a .jar file that you can submit on your Flink
>  * cluster.
>  * Just type
>  *mvn clean package
>  * in the projects root directory.
>  * You will find the jar in
>  *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
>  *
>  */
> public class Job {
> public static final TypeInformation[] fieldTypes = new 
> TypeInformation[]{
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.FLOAT_TYPE_INFO
> };
> public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
> public static void main(String[] args) {
> // set up the execution environment
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> JDBCInputFormatBuilder inputBuilder = 
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("select CLIENT_ID,OCCUR_BALANCE from 
> HS_ASSET.FUNDJOUR@OTC")
> .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
> .setRowTypeInfo(rowTypeInfo);
> DataSet source = env.createInput(inputBuilder.finish());
> source.output(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("insert into dengabc (client_id,salary) 
> values(?,?)")
> .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE})
> .finish());
> //source.print();
> //source.first(20).print();
> //dbData.print();
> /**
>  * Here, you can start creating your execution plan for Flink.
>  *
>  * Start with getting some data from the environment, like
>  *env.readTextFile(textPath);
>  *
>  * then, transform the resulting DataSet using operations
>  * like
>  *.filter()
>  *.flatMap()
>  *.join()
>  *.coGroup()
>  * and many more.
>  * Have a look at the programming guide for the Java API:
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/index.html
>  *
>  * and the examples
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/examples.html
>  *
>  */
> // execute program
> try {
> env.execute("Flink Java API Skeleton");
> } catch (Exception e) {
> e.getMessage();
> }
> }
> }
> my pom.xml like this:
> 
> http://maven.apache.org/POM/4.0.0; 
> 

[jira] [Commented] (FLINK-4112) Replace SuppressRestartException with disabling RestartStrategy

2016-06-23 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346638#comment-15346638
 ] 

Chesnay Schepler commented on FLINK-4112:
-

Does this entail removing the SuppressRestartException class?

> Replace SuppressRestartException with disabling RestartStrategy
> ---
>
> Key: FLINK-4112
> URL: https://issues.apache.org/jira/browse/FLINK-4112
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0
>
>
> Currently, Flink uses {{SuppressRestartExceptions}} to suppress job restarts. 
> This mechanism can be used by different components to control the lifecycle 
> of a job. Since the control flow of exceptions can be quite difficult 
> retrace, I propose to replace this mechanism by an explicit 
> {{RestartStrategy}} disable method. So instead of throwing a 
> {{SuppressRestartException}} we should disable the {{RestartStrategy}} to 
> avoid further job restarts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346634#comment-15346634
 ] 

Chesnay Schepler commented on FLINK-4098:
-

print statements should appear in the taskmanager.out

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4093) Expose metric interfaces

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346611#comment-15346611
 ] 

ASF GitHub Bot commented on FLINK-4093:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68256978
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

in other words, it allows the instantiation, registration and assignment to 
a variable or field in a single line without loosing the subclass type.


> Expose metric interfaces
> 
>
> Key: FLINK-4093
> URL: https://issues.apache.org/jira/browse/FLINK-4093
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces

2016-06-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68256978
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

in other words, it allows the instantiation, registration and assignment to 
a variable or field in a single line without loosing the subclass type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Geoffrey Mon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346600#comment-15346600
 ] 

Geoffrey Mon edited comment on FLINK-4098 at 6/23/16 3:26 PM:
--

I wasn't sure how to put in print statements because they don't seem to appear 
in the jobmanager {{.out}} when I add them into {{MapFunction._run()}}.

Here's a trimmed jobmanager log for a simple Python iteration program that 
iterates each number in a Dataset:

{code}
INFO  org.apache.flink.runtime.jobmanager.JobManager- Status of 
job ac35083fec7c77cc1707245563aed90d (Flink Java Job at Thu Jun 23 11:14:51 EDT 
2016) changed to RUNNING.

INFO  org.apache.flink.runtime.taskmanager.Task - Obtaining 
local cache file for 'flink'
INFO  org.apache.flink.runtime.taskmanager.Task - 
PartialSolution (Bulk Iteration) (1/1) switched to RUNNING
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- 
PartialSolution (Bulk Iteration) (1/1) (5c0ff0bae3952df909198ebf94f51dcf) 
switched from DEPLOYING to RUNNING
INFO  org.apache.flink.runtime.iterative.task.IterationHeadTask - starting 
iteration \[1\]:  PartialSolution (Bulk Iteration) (1/1)
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- 
MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from 
CREATED to SCHEDULED
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- 
MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from 
SCHEDULED to DEPLOYING

INFO  org.apache.flink.runtime.taskmanager.Task - 
MapPartition (PythonMap) (1/1) switched to RUNNING
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- 
MapPartition (PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from 
DEPLOYING to RUNNING
INFO  org.apache.flink.runtime.iterative.task.IterationTailTask - starting 
iteration \[1\]:  MapPartition (PythonMap) (1/1)
{code}
The program does not proceed past that point and appears to hang without any 
errors.


was (Author: hydronium):
Here's a jobmanager log for a simple Python iteration program that iterates 
each number in a Dataset:

2016-06-23 11:14:51,900 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job ac35083fec7c77cc1707245563aed90d (Flink Java Job at 
Thu Jun 23 11:14:51 EDT 2016) changed to RUNNING.

2016-06-23 11:14:52,266 INFO  org.apache.flink.runtime.taskmanager.Task 
- Obtaining local cache file for 'flink'
2016-06-23 11:14:52,267 INFO  org.apache.flink.runtime.taskmanager.Task 
- PartialSolution (Bulk Iteration) (1/1) switched to RUNNING
2016-06-23 11:14:52,268 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- PartialSolution 
(Bulk Iteration) (1/1) (5c0ff0bae3952df909198ebf94f51dcf) switched from 
DEPLOYING to RUNNING
2016-06-23 11:14:53,053 INFO  
org.apache.flink.runtime.iterative.task.IterationHeadTask - starting 
iteration [1]:  PartialSolution (Bulk Iteration) (1/1)
2016-06-23 11:14:53,059 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition 
(PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from CREATED to 
SCHEDULED
2016-06-23 11:14:53,059 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition 
(PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from SCHEDULED to 
DEPLOYING

2016-06-23 11:14:53,077 INFO  org.apache.flink.runtime.taskmanager.Task 
- MapPartition (PythonMap) (1/1) switched to RUNNING
2016-06-23 11:14:53,078 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition 
(PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from DEPLOYING to 
RUNNING
2016-06-23 11:14:53,155 INFO  
org.apache.flink.runtime.iterative.task.IterationTailTask - starting 
iteration [1]:  MapPartition (PythonMap) (1/1)

The program does not proceed past that point and appears to hang without any 
errors.

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4098) Iteration support in Python API

2016-06-23 Thread Geoffrey Mon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346600#comment-15346600
 ] 

Geoffrey Mon commented on FLINK-4098:
-

Here's a jobmanager log for a simple Python iteration program that iterates 
each number in a Dataset:

2016-06-23 11:14:51,900 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job ac35083fec7c77cc1707245563aed90d (Flink Java Job at 
Thu Jun 23 11:14:51 EDT 2016) changed to RUNNING.

2016-06-23 11:14:52,266 INFO  org.apache.flink.runtime.taskmanager.Task 
- Obtaining local cache file for 'flink'
2016-06-23 11:14:52,267 INFO  org.apache.flink.runtime.taskmanager.Task 
- PartialSolution (Bulk Iteration) (1/1) switched to RUNNING
2016-06-23 11:14:52,268 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- PartialSolution 
(Bulk Iteration) (1/1) (5c0ff0bae3952df909198ebf94f51dcf) switched from 
DEPLOYING to RUNNING
2016-06-23 11:14:53,053 INFO  
org.apache.flink.runtime.iterative.task.IterationHeadTask - starting 
iteration [1]:  PartialSolution (Bulk Iteration) (1/1)
2016-06-23 11:14:53,059 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition 
(PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from CREATED to 
SCHEDULED
2016-06-23 11:14:53,059 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition 
(PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from SCHEDULED to 
DEPLOYING

2016-06-23 11:14:53,077 INFO  org.apache.flink.runtime.taskmanager.Task 
- MapPartition (PythonMap) (1/1) switched to RUNNING
2016-06-23 11:14:53,078 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition 
(PythonMap) (1/1) (107be54ae5b60776dc35cc2ace83a2f0) switched from DEPLOYING to 
RUNNING
2016-06-23 11:14:53,155 INFO  
org.apache.flink.runtime.iterative.task.IterationTailTask - starting 
iteration [1]:  MapPartition (PythonMap) (1/1)

The program does not proceed past that point and appears to hang without any 
errors.

> Iteration support in Python API
> ---
>
> Key: FLINK-4098
> URL: https://issues.apache.org/jira/browse/FLINK-4098
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Affects Versions: 1.0.2
>Reporter: Geoffrey Mon
>Priority: Minor
>
> Bulk and delta iterations are not supported in the Python API. Currently 
> working on this at https://github.com/GEOFBOT/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4093) Expose metric interfaces

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346598#comment-15346598
 ] 

ASF GitHub Bot commented on FLINK-4093:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68255328
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

users can write ```MyCounter c = metricGroup.addCounter("myC", new 
MyCounter());```


> Expose metric interfaces
> 
>
> Key: FLINK-4093
> URL: https://issues.apache.org/jira/browse/FLINK-4093
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces

2016-06-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68255328
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

users can write ```MyCounter c = metricGroup.addCounter("myC", new 
MyCounter());```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3319) Add or operator to CEP's pattern API

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346584#comment-15346584
 ] 

ASF GitHub Bot commented on FLINK-3319:
---

GitHub user thormanrd opened a pull request:

https://github.com/apache/flink/pull/2155

[FLINK-3319] Added Or function for where clause in CEP Pattern

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thormanrd/flink feature/FLINK-3319

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2155.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2155


commit e71514574e6e7d5623eaf4e556d708c642ccb6f2
Author: Bob Thorman 
Date:   2016-06-16T12:48:53Z

FLINT-3839 Added the extraction of jar files from a URL spec that is a 
directory in the form of file:///path/to/jars/<*>

commit 5705962613ea0e64bfbdfa1b544c3fd8870a4540
Author: Bob Thorman 
Date:   2016-06-22T00:54:02Z

FLINK-3319 Added OrFilterFunction class and the or method to the Pattern 
class

commit 771f7029571db0345e0713c93f914ba945690c97
Author: Bob Thorman 
Date:   2016-06-22T13:23:48Z

FLINK-3319 Added or method to scala Pattern class




> Add or operator to CEP's pattern API
> 
>
> Key: FLINK-3319
> URL: https://issues.apache.org/jira/browse/FLINK-3319
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Robert Thorman
>Priority: Minor
>
> Adding an {{or}} operator to CEP's pattern API would be beneficial. This 
> would considerably extend the set of supported patterns. The {{or}} operator 
> lets you define multiple succeeding pattern states for the next stage.
> {code}
> Pattern.begin("start").next("middle1").where(...).or("middle2").where(...)
> {code}
> In order to implement the {{or}} operator, one has to extend the {{Pattern}} 
> class. Furthermore, the {{NFACompiler}} has to be extended to generate two 
> resulting pattern states in case of an {{or}} operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2155: [FLINK-3319] Added Or function for where clause in...

2016-06-23 Thread thormanrd
GitHub user thormanrd opened a pull request:

https://github.com/apache/flink/pull/2155

[FLINK-3319] Added Or function for where clause in CEP Pattern

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/thormanrd/flink feature/FLINK-3319

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2155.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2155


commit e71514574e6e7d5623eaf4e556d708c642ccb6f2
Author: Bob Thorman 
Date:   2016-06-16T12:48:53Z

FLINT-3839 Added the extraction of jar files from a URL spec that is a 
directory in the form of file:///path/to/jars/<*>

commit 5705962613ea0e64bfbdfa1b544c3fd8870a4540
Author: Bob Thorman 
Date:   2016-06-22T00:54:02Z

FLINK-3319 Added OrFilterFunction class and the or method to the Pattern 
class

commit 771f7029571db0345e0713c93f914ba945690c97
Author: Bob Thorman 
Date:   2016-06-22T13:23:48Z

FLINK-3319 Added or method to scala Pattern class




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4093) Expose metric interfaces

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346547#comment-15346547
 ] 

ASF GitHub Bot commented on FLINK-4093:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2134
  
The changes look good to me. +1 for merging. Good work @zentol. I only had 
a question concerning the use of generic parameters which is not really 
necessary, imo.


> Expose metric interfaces
> 
>
> Key: FLINK-4093
> URL: https://issues.apache.org/jira/browse/FLINK-4093
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2134: [FLINK-4093] Expose metric interfaces

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2134
  
The changes look good to me. +1 for merging. Good work @zentol. I only had 
a question concerning the use of generic parameters which is not really 
necessary, imo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4093) Expose metric interfaces

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346545#comment-15346545
 ] 

ASF GitHub Bot commented on FLINK-4093:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68248549
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

What's the advantage of introducing the bounded generic parameter over 
simply having a `Counter` parameter?


> Expose metric interfaces
> 
>
> Key: FLINK-4093
> URL: https://issues.apache.org/jira/browse/FLINK-4093
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2134: [FLINK-4093] Expose metric interfaces

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2134#discussion_r68248549
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 ---
@@ -146,7 +147,16 @@ public Counter counter(int name) {
 
@Override
public Counter counter(String name) {
-   Counter counter = new Counter();
+   return counter(name, new SimpleCounter());
+   }
+   
+   @Override
+   public  C counter(int name, C counter) {
--- End diff --

What's the advantage of introducing the bounded generic parameter over 
simply having a `Counter` parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

2016-06-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2105#discussion_r68247987
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
@@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
 
if (reporter instanceof Scheduled) {
LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
-   long millis = timeunit.toMillis(period);

-   timer = new java.util.Timer("Periodic 
Metrics Reporter", true);
-   timer.schedule(new 
ReporterTask((Scheduled) reporter), millis, millis);
+   executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
}
else {
-   timer = null;
+   executor = null;
}
}
catch (Throwable t) {
reporter = new JMXReporter();
-   timer = null;
+   executor = null;
LOG.error("Could not instantiate custom metrics 
reporter. Defaulting to JMX metrics export.", t);
}
 
this.reporter = reporter;
-   this.timer = timer;
+   this.executor = executor;
}
}
 
/**
 * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
 */
public void shutdown() {
-   if (timer != null) {
-   timer.cancel();
-   }
if (reporter != null) {
try {
reporter.close();
} catch (Throwable t) {
LOG.warn("Metrics reporter did not shut down 
cleanly", t);
}
}
+   if (executor != null) {
+   executor.shutdownNow();
--- End diff --

*correction* all reporters _that regularly send out reports_ follow the 
same logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4074) Reporter can block TaskManager shutdown

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346541#comment-15346541
 ] 

ASF GitHub Bot commented on FLINK-4074:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2105#discussion_r68247987
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
@@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
 
if (reporter instanceof Scheduled) {
LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
-   long millis = timeunit.toMillis(period);

-   timer = new java.util.Timer("Periodic 
Metrics Reporter", true);
-   timer.schedule(new 
ReporterTask((Scheduled) reporter), millis, millis);
+   executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
}
else {
-   timer = null;
+   executor = null;
}
}
catch (Throwable t) {
reporter = new JMXReporter();
-   timer = null;
+   executor = null;
LOG.error("Could not instantiate custom metrics 
reporter. Defaulting to JMX metrics export.", t);
}
 
this.reporter = reporter;
-   this.timer = timer;
+   this.executor = executor;
}
}
 
/**
 * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
 */
public void shutdown() {
-   if (timer != null) {
-   timer.cancel();
-   }
if (reporter != null) {
try {
reporter.close();
} catch (Throwable t) {
LOG.warn("Metrics reporter did not shut down 
cleanly", t);
}
}
+   if (executor != null) {
+   executor.shutdownNow();
--- End diff --

*correction* all reporters _that regularly send out reports_ follow the 
same logic.


> Reporter can block TaskManager shutdown
> ---
>
> Key: FLINK-4074
> URL: https://issues.apache.org/jira/browse/FLINK-4074
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> If a report is being submitted while a TaskManager is shutting down the 
> reporter can cause the shutdown to be delayed since it submits the complete 
> report.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346488#comment-15346488
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2110
  
Excellent work @aljoscha. Great fix for the problem and it's also really 
nice that we could get rid of the IT case :-) +1 for merging after addressing 
my minor comments.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2110
  
Excellent work @aljoscha. Great fix for the problem and it's also really 
nice that we could get rid of the IT case :-) +1 for merging after addressing 
my minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346486#comment-15346486
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68241018
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.contains;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for stream operator chaining behaviour.
+ */
+public class ChainingITCase {
--- End diff --

I think that this test case no longer needs to be an IT case.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> 

[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68241018
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ChainingITCase.java
 ---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.contains;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for stream operator chaining behaviour.
+ */
+public class ChainingITCase {
--- End diff --

I think that this test case no longer needs to be an IT case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2105: [FLINK-4074] Make metric reporters less blocking

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2105#discussion_r68238993
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
@@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
 
if (reporter instanceof Scheduled) {
LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
-   long millis = timeunit.toMillis(period);

-   timer = new java.util.Timer("Periodic 
Metrics Reporter", true);
-   timer.schedule(new 
ReporterTask((Scheduled) reporter), millis, millis);
+   executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
}
else {
-   timer = null;
+   executor = null;
}
}
catch (Throwable t) {
reporter = new JMXReporter();
-   timer = null;
+   executor = null;
LOG.error("Could not instantiate custom metrics 
reporter. Defaulting to JMX metrics export.", t);
}
 
this.reporter = reporter;
-   this.timer = timer;
+   this.executor = executor;
}
}
 
/**
 * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
 */
public void shutdown() {
-   if (timer != null) {
-   timer.cancel();
-   }
if (reporter != null) {
try {
reporter.close();
} catch (Throwable t) {
LOG.warn("Metrics reporter did not shut down 
cleanly", t);
}
}
+   if (executor != null) {
+   executor.shutdownNow();
--- End diff --

Alright. But not all reporters use the same logic, since we have 
`Scheduled` and non-scheduled reporters, haven't we?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4062) Update Windowing Documentation

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346465#comment-15346465
 ] 

ASF GitHub Bot commented on FLINK-4062:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2154

[FLINK-4062] Update Windowing Documentation

R: @uce and @kl0u for review



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink window/doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2154.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2154


commit 6e0439dffd35c0d924f0ee03d1e075ab6a762989
Author: Aljoscha Krettek 
Date:   2016-06-23T08:23:02Z

[FLINK-4062] Update Windowing Documentation




> Update Windowing Documentation
> --
>
> Key: FLINK-4062
> URL: https://issues.apache.org/jira/browse/FLINK-4062
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> The window documentation could be a bit more principled and also needs 
> updating with the new allowed lateness setting.
> There is also essentially no documentation about how to write a custom 
> trigger.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346484#comment-15346484
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68240257
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -386,4 +402,23 @@ public void close() {
}
}
}
+
+   /**
+* Special version of {@link BroadcastingOutputCollector} that performs 
a shallow copy of the
+* {@link StreamRecord} to ensure that multi-chaining works correctly.
+*/
+   private static final class CopyingBroadcastingOutputCollector 
extends BroadcastingOutputCollector {
+
+   public 
CopyingBroadcastingOutputCollector(Output[] outputs) {
+   super(outputs);
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   for (Output output : outputs) {
+   StreamRecord shallowCopy = 
record.copy(record.getValue());
+   output.collect(shallowCopy);
+   }
--- End diff --

Here, the same. I think we could save one copying operation by giving the 
original `record` to the last output.


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68240257
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -386,4 +402,23 @@ public void close() {
}
}
}
+
+   /**
+* Special version of {@link BroadcastingOutputCollector} that performs 
a shallow copy of the
+* {@link StreamRecord} to ensure that multi-chaining works correctly.
+*/
+   private static final class CopyingBroadcastingOutputCollector 
extends BroadcastingOutputCollector {
+
+   public 
CopyingBroadcastingOutputCollector(Output[] outputs) {
+   super(outputs);
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   for (Output output : outputs) {
+   StreamRecord shallowCopy = 
record.copy(record.getValue());
+   output.collect(shallowCopy);
+   }
--- End diff --

Here, the same. I think we could save one copying operation by giving the 
original `record` to the last output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68239970
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Special version of {@link DirectedOutput} that performs a shallow copy 
of the
+ * {@link StreamRecord} to ensure that multi-chaining works correctly.
+ */
+public class CopyingDirectedOutput extends DirectedOutput {
+
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public CopyingDirectedOutput(
+   List outputSelectors,
+   List, StreamEdge>> 
outputs) {
+   super(outputSelectors, outputs);
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   Set> selectedOutputs = 
selectOutputs(record);
+
+   for (Output out : selectedOutputs) {
+   StreamRecord shallowCopy = 
record.copy(record.getValue());
+   out.collect(shallowCopy);
+   }
--- End diff --

Can't we save one copy operation by giving `record` to the last selected 
output?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346476#comment-15346476
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2110#discussion_r68239970
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Special version of {@link DirectedOutput} that performs a shallow copy 
of the
+ * {@link StreamRecord} to ensure that multi-chaining works correctly.
+ */
+public class CopyingDirectedOutput extends DirectedOutput {
+
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public CopyingDirectedOutput(
+   List outputSelectors,
+   List, StreamEdge>> 
outputs) {
+   super(outputSelectors, outputs);
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   Set> selectedOutputs = 
selectOutputs(record);
+
+   for (Output out : selectedOutputs) {
+   StreamRecord shallowCopy = 
record.copy(record.getValue());
+   out.collect(shallowCopy);
+   }
--- End diff --

Can't we save one copy operation by giving `record` to the last selected 
output?


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2154: [FLINK-4062] Update Windowing Documentation

2016-06-23 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2154

[FLINK-4062] Update Windowing Documentation

R: @uce and @kl0u for review



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink window/doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2154.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2154


commit 6e0439dffd35c0d924f0ee03d1e075ab6a762989
Author: Aljoscha Krettek 
Date:   2016-06-23T08:23:02Z

[FLINK-4062] Update Windowing Documentation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4074) Reporter can block TaskManager shutdown

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346469#comment-15346469
 ] 

ASF GitHub Bot commented on FLINK-4074:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2105#discussion_r68238993
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/MetricRegistry.java ---
@@ -118,40 +120,38 @@ public MetricRegistry(Configuration config) {
 
if (reporter instanceof Scheduled) {
LOG.info("Periodically reporting 
metrics in intervals of {} {}", period, timeunit.name());
-   long millis = timeunit.toMillis(period);

-   timer = new java.util.Timer("Periodic 
Metrics Reporter", true);
-   timer.schedule(new 
ReporterTask((Scheduled) reporter), millis, millis);
+   executor.scheduleWithFixedDelay(new 
ReporterTask((Scheduled) reporter), period, period, timeunit);
}
else {
-   timer = null;
+   executor = null;
}
}
catch (Throwable t) {
reporter = new JMXReporter();
-   timer = null;
+   executor = null;
LOG.error("Could not instantiate custom metrics 
reporter. Defaulting to JMX metrics export.", t);
}
 
this.reporter = reporter;
-   this.timer = timer;
+   this.executor = executor;
}
}
 
/**
 * Shuts down this registry and the associated {@link 
org.apache.flink.metrics.reporter.MetricReporter}.
 */
public void shutdown() {
-   if (timer != null) {
-   timer.cancel();
-   }
if (reporter != null) {
try {
reporter.close();
} catch (Throwable t) {
LOG.warn("Metrics reporter did not shut down 
cleanly", t);
}
}
+   if (executor != null) {
+   executor.shutdownNow();
--- End diff --

Alright. But not all reporters use the same logic, since we have 
`Scheduled` and non-scheduled reporters, haven't we?


> Reporter can block TaskManager shutdown
> ---
>
> Key: FLINK-4074
> URL: https://issues.apache.org/jira/browse/FLINK-4074
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> If a report is being submitted while a TaskManager is shutting down the 
> reporter can cause the shutdown to be delayed since it submits the complete 
> report.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4087) JMXReporter can't handle port conflicts

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346461#comment-15346461
 ] 

ASF GitHub Bot commented on FLINK-4087:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2145#discussion_r68238504
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---
@@ -73,10 +86,61 @@ public JMXReporter() {
// 

 
@Override
-   public void open(Configuration config) {}
+   public void open(Configuration config) {
+   this.jmxServer = startJmxServer(config);
+   }
+
+   private static JMXServer startJmxServer(Configuration config) {
+   JMXServer jmxServer;
+
+   String portRange = config.getString(KEY_METRICS_JMX_PORT, 
"9010-9025");
+   String[] ports = portRange.split("-");
+
+   if (ports.length == 0 || ports.length > 2) {
+   throw new IllegalArgumentException("JMX port range was 
configured incorrectly. " +
+   "Expected: [-] Configured: 
" + portRange);
+   }
+
+   if (ports.length == 1) { //single port was configured
+   int port = Integer.parseInt(ports[0]);
+   jmxServer = new JMXServer(port);
+   try {
+   jmxServer.start();
+   } catch (IOException e) {
+   throw new RuntimeException("Could not start JMX 
server on port " + port + ".");
+   }
+   return jmxServer;
+   } else { //port range was configured
+   int start = Integer.parseInt(ports[0]);
+   int end = Integer.parseInt(ports[1]);
+   while (true) {
+   try {
+   jmxServer = new JMXServer(start);
+   jmxServer.start();
+   LOG.info("Starting JMX on port " + 
start + ".");
--- End diff --

Sure, but it might be interesting to see explicitly which ports were tried, 
imho.


> JMXReporter can't handle port conflicts
> ---
>
> Key: FLINK-4087
> URL: https://issues.apache.org/jira/browse/FLINK-4087
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.1.0
>
>
> The JMXReporter is currently configured to use a single port that is set as a 
> JVM argument.
> This approach has a few disadvantages:
> If multiple TaskManagers run on the same machine only 1 can expose metrics. 
> This issue is compounded by the upcoming JobManager metrics, which would then 
> prevent TM metrics from being exposed in local setups.
> Currently, we prevent other TM's from exposing metrics by checking the the 
> start-daemon-sh whether a TM is already running, and if so clear the 
> arguments. This isn't a particular safe way to do it, and this script is not 
> used when deploying on yarn, leading to TM failures since the JVM can't 
> allocate the JMX port.
> We should find a way to specifiy port-ranges for JMX and log the final port 
> used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2145: [FLINK-4087] [metrics] Improved JMX port handling

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2145#discussion_r68238504
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---
@@ -73,10 +86,61 @@ public JMXReporter() {
// 

 
@Override
-   public void open(Configuration config) {}
+   public void open(Configuration config) {
+   this.jmxServer = startJmxServer(config);
+   }
+
+   private static JMXServer startJmxServer(Configuration config) {
+   JMXServer jmxServer;
+
+   String portRange = config.getString(KEY_METRICS_JMX_PORT, 
"9010-9025");
+   String[] ports = portRange.split("-");
+
+   if (ports.length == 0 || ports.length > 2) {
+   throw new IllegalArgumentException("JMX port range was 
configured incorrectly. " +
+   "Expected: [-] Configured: 
" + portRange);
+   }
+
+   if (ports.length == 1) { //single port was configured
+   int port = Integer.parseInt(ports[0]);
+   jmxServer = new JMXServer(port);
+   try {
+   jmxServer.start();
+   } catch (IOException e) {
+   throw new RuntimeException("Could not start JMX 
server on port " + port + ".");
+   }
+   return jmxServer;
+   } else { //port range was configured
+   int start = Integer.parseInt(ports[0]);
+   int end = Integer.parseInt(ports[1]);
+   while (true) {
+   try {
+   jmxServer = new JMXServer(start);
+   jmxServer.start();
+   LOG.info("Starting JMX on port " + 
start + ".");
--- End diff --

Sure, but it might be interesting to see explicitly which ports were tried, 
imho.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4087) JMXReporter can't handle port conflicts

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346459#comment-15346459
 ] 

ASF GitHub Bot commented on FLINK-4087:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2145#discussion_r68238384
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---
@@ -265,4 +329,72 @@ public Object getValue() {
return gauge.getValue();
}
}
+
+   /**
+* JMX Server implementation that JMX clients can connect to.
+*
+* Heavily based on j256 simplejmx project
+*
+* 
https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+*/
+   private static class JMXServer {
+   private int port;
+   private Registry rmiRegistry;
--- End diff --

Hmm, if we don't understand the code and, thus, cannot explain what it 
does, we shouldn't include the code. The question who should maintain this kind 
of code. I think it would be best if you could do some research to clarify how 
the `JMXServer` works and add some comments explaining the different components.


> JMXReporter can't handle port conflicts
> ---
>
> Key: FLINK-4087
> URL: https://issues.apache.org/jira/browse/FLINK-4087
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.1.0
>
>
> The JMXReporter is currently configured to use a single port that is set as a 
> JVM argument.
> This approach has a few disadvantages:
> If multiple TaskManagers run on the same machine only 1 can expose metrics. 
> This issue is compounded by the upcoming JobManager metrics, which would then 
> prevent TM metrics from being exposed in local setups.
> Currently, we prevent other TM's from exposing metrics by checking the the 
> start-daemon-sh whether a TM is already running, and if so clear the 
> arguments. This isn't a particular safe way to do it, and this script is not 
> used when deploying on yarn, leading to TM failures since the JVM can't 
> allocate the JMX port.
> We should find a way to specifiy port-ranges for JMX and log the final port 
> used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2145: [FLINK-4087] [metrics] Improved JMX port handling

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2145#discussion_r68238384
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/metrics/reporter/JMXReporter.java ---
@@ -265,4 +329,72 @@ public Object getValue() {
return gauge.getValue();
}
}
+
+   /**
+* JMX Server implementation that JMX clients can connect to.
+*
+* Heavily based on j256 simplejmx project
+*
+* 
https://github.com/j256/simplejmx/blob/master/src/main/java/com/j256/simplejmx/server/JmxServer.java
+*/
+   private static class JMXServer {
+   private int port;
+   private Registry rmiRegistry;
--- End diff --

Hmm, if we don't understand the code and, thus, cannot explain what it 
does, we shouldn't include the code. The question who should maintain this kind 
of code. I think it would be best if you could do some research to clarify how 
the `JMXServer` works and add some comments explaining the different components.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4110) Provide testing skeleton in quickstarts

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346457#comment-15346457
 ] 

ASF GitHub Bot commented on FLINK-4110:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2153

[FLINK-4110] Add testing skeleton to quickstart

With this change, our quickstart archetypes will also contain some sample 
code for bringing up an embedded server for testing purposes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink4110

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2153


commit e887903d4efd01cff1dcea4dee279c927064a1a4
Author: Robert Metzger 
Date:   2016-06-23T13:35:18Z

[FLINK-4110] Add testing skeleton to quickstart




> Provide testing skeleton in quickstarts
> ---
>
> Key: FLINK-4110
> URL: https://issues.apache.org/jira/browse/FLINK-4110
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Quickstarts
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>
> Users often ask how they can test their Flink code.
> I suggest to provide a very simple skeleton / example in the quickstarts, 
> showing how its done.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2153: [FLINK-4110] Add testing skeleton to quickstart

2016-06-23 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2153

[FLINK-4110] Add testing skeleton to quickstart

With this change, our quickstart archetypes will also contain some sample 
code for bringing up an embedded server for testing purposes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink4110

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2153


commit e887903d4efd01cff1dcea4dee279c927064a1a4
Author: Robert Metzger 
Date:   2016-06-23T13:35:18Z

[FLINK-4110] Add testing skeleton to quickstart




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2152: [FLINK-3920] Distributed Linear Algebra: block-bas...

2016-06-23 Thread chobeat
GitHub user chobeat opened a pull request:

https://github.com/apache/flink/pull/2152

[FLINK-3920] Distributed Linear Algebra: block-based matrix

Second part of the distributed linear algebra contribution. This  PR 
introduces block-partitioned matrices, operations on them (multiplication, 
per-row operations) and conversions from and to row-partitioned matrices.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radicalbit/flink FLINK-3920

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2152


commit c59125272cc97be21bfe303fe2bb0cdd70c81915
Author: chobeat 
Date:   2016-06-23T12:37:13Z

BlockMatrix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3920) Distributed Linear Algebra: block-based matrix

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346435#comment-15346435
 ] 

ASF GitHub Bot commented on FLINK-3920:
---

GitHub user chobeat opened a pull request:

https://github.com/apache/flink/pull/2152

[FLINK-3920] Distributed Linear Algebra: block-based matrix

Second part of the distributed linear algebra contribution. This  PR 
introduces block-partitioned matrices, operations on them (multiplication, 
per-row operations) and conversions from and to row-partitioned matrices.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radicalbit/flink FLINK-3920

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2152


commit c59125272cc97be21bfe303fe2bb0cdd70c81915
Author: chobeat 
Date:   2016-06-23T12:37:13Z

BlockMatrix




> Distributed Linear Algebra: block-based matrix
> --
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Simone Robutti
>Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2096
  
Will be merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job status

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2096
  
Thanks for your thorough review @uce.

- We have talked offline concerning the status in the web interface and it 
turned out to be not a problem since the jobs are directly removed from 
`currentJobs` in the `JobManager` when `cancelAndClearEverything` is called. 
This implies that the jobs will no longer be shown in the web interface.

- I agree that it is a good idea to replace the `SuppressRestartException` 
by a mechanism to disable the `RestartStrategies`. I've opened a [JIRA 
issue](https://issues.apache.org/jira/browse/FLINK-4112) to keep track of the 
effort. I think, however, that the mechanism should be implemented as part of 
the work on this issue. That way we won't mix pull requests with each other.

- You're right. I've addressed the comment and added `SUSPENDED` to the set 
of states which don't throw an exception when encountered in 
`ExecutionGraph.restart`.

- You're right concerning the `JobManagerProcess` tests. Maybe we can 
refactor some of them in the future by applying a similar pattern as it was 
used in `JobManagerHARecoveryTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346404#comment-15346404
 ] 

ASF GitHub Bot commented on FLINK-3800:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2096
  
Will be merging this PR.


> ExecutionGraphs can become orphans
> --
>
> Key: FLINK-3800
> URL: https://issues.apache.org/jira/browse/FLINK-3800
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{JobManager.cancelAndClearEverything}} method fails all currently 
> executed jobs on the {{JobManager}} and then clears the list of 
> {{currentJobs}} kept in the JobManager. This can become problematic if the 
> user has set a restart strategy for a job, because the {{RestartStrategy}} 
> will try to restart the job. This can lead to unwanted re-deployments of the 
> job which consumes resources and thus will trouble the execution of other 
> jobs. If the restart strategy never stops, then this prevents that the 
> {{ExecutionGraph}} from ever being properly terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346399#comment-15346399
 ] 

ASF GitHub Bot commented on FLINK-3800:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2096
  
Thanks for your thorough review @uce.

- We have talked offline concerning the status in the web interface and it 
turned out to be not a problem since the jobs are directly removed from 
`currentJobs` in the `JobManager` when `cancelAndClearEverything` is called. 
This implies that the jobs will no longer be shown in the web interface.

- I agree that it is a good idea to replace the `SuppressRestartException` 
by a mechanism to disable the `RestartStrategies`. I've opened a [JIRA 
issue](https://issues.apache.org/jira/browse/FLINK-4112) to keep track of the 
effort. I think, however, that the mechanism should be implemented as part of 
the work on this issue. That way we won't mix pull requests with each other.

- You're right. I've addressed the comment and added `SUSPENDED` to the set 
of states which don't throw an exception when encountered in 
`ExecutionGraph.restart`.

- You're right concerning the `JobManagerProcess` tests. Maybe we can 
refactor some of them in the future by applying a similar pattern as it was 
used in `JobManagerHARecoveryTest`.


> ExecutionGraphs can become orphans
> --
>
> Key: FLINK-3800
> URL: https://issues.apache.org/jira/browse/FLINK-3800
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{JobManager.cancelAndClearEverything}} method fails all currently 
> executed jobs on the {{JobManager}} and then clears the list of 
> {{currentJobs}} kept in the JobManager. This can become problematic if the 
> user has set a restart strategy for a job, because the {{RestartStrategy}} 
> will try to restart the job. This can lead to unwanted re-deployments of the 
> job which consumes resources and thus will trouble the execution of other 
> jobs. If the restart strategy never stops, then this prevents that the 
> {{ExecutionGraph}} from ever being properly terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4112) Replace SuppressRestartException with disabling RestartStrategy

2016-06-23 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4112:


 Summary: Replace SuppressRestartException with disabling 
RestartStrategy
 Key: FLINK-4112
 URL: https://issues.apache.org/jira/browse/FLINK-4112
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.1.0
Reporter: Till Rohrmann
Priority: Minor
 Fix For: 1.2.0


Currently, Flink uses {{SuppressRestartExceptions}} to suppress job restarts. 
This mechanism can be used by different components to control the lifecycle of 
a job. Since the control flow of exceptions can be quite difficult retrace, I 
propose to replace this mechanism by an explicit {{RestartStrategy}} disable 
method. So instead of throwing a {{SuppressRestartException}} we should disable 
the {{RestartStrategy}} to avoid further job restarts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3965) Delegating GraphAlgorithm

2016-06-23 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346395#comment-15346395
 ] 

Vasia Kalavri commented on FLINK-3965:
--

The functionality is much needed in my opinion. My concern is what we expose to 
users and what we keep internal to Gelly. If the {{DelegatingGraphAlgorithm}} 
and the {{GraphAnalytic}} are intended for users, then we should make their 
functionalities and differences very clear in the docs, including examples. 
Maybe that can be done as part of FLINK-4104?

> Delegating GraphAlgorithm
> -
>
> Key: FLINK-3965
> URL: https://issues.apache.org/jira/browse/FLINK-3965
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Complex and related algorithms often overlap in computation of data. Two such 
> examples are:
> 1) the local and global clustering coefficients each use a listing of 
> triangles
> 2) the local clustering coefficient joins on vertex degree, and the 
> underlying triangle listing annotates edge degree which uses vertex degree
> We can reuse and rewrite algorithm output by creating a {{ProxyObject}} as a 
> delegate for method calls to the {{DataSet}} returned by the algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3647) Change StreamSource to use Processing-Time Clock Service

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346364#comment-15346364
 ] 

ASF GitHub Bot commented on FLINK-3647:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2124#discussion_r68227329
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 ---
@@ -27,10 +27,12 @@
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 /**
- * A {@link Trigger} that continuously fires based on a given time 
interval. The time is the current
- * system time.
+ * A {@link Trigger} that continuously fires based on a given time 
interval. The current (processing)
+ * time is provided by the {@link TimeServiceProvider}
--- End diff --

I think this is an implementation detail that should not necessarily be 
mentioned here.


> Change StreamSource to use Processing-Time Clock Service
> 
>
> Key: FLINK-3647
> URL: https://issues.apache.org/jira/browse/FLINK-3647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> Currently, the {{StreamSource.AutomaticWatermarkContext}} has it's own timer 
> service. This should be changed to use the Clock service introduced in 
> FLINK-3646 to make watermark emission testable by providing a custom Clock.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

2016-06-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2124#discussion_r68227329
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 ---
@@ -27,10 +27,12 @@
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 /**
- * A {@link Trigger} that continuously fires based on a given time 
interval. The time is the current
- * system time.
+ * A {@link Trigger} that continuously fires based on a given time 
interval. The current (processing)
+ * time is provided by the {@link TimeServiceProvider}
--- End diff --

I think this is an implementation detail that should not necessarily be 
mentioned here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3647) Change StreamSource to use Processing-Time Clock Service

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346360#comment-15346360
 ] 

ASF GitHub Bot commented on FLINK-3647:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2124#discussion_r68227049
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * A context provided to the {@link WindowAssigner} that allows it to 
query the
+ * current processing time. This is provided to the assigner by its 
containing
+ * {@link 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
+ * which, in turn, gets it from the containing
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
+ */
+public abstract class WindowAssignerContext {
--- End diff --

This should be moved to `WindowAssigner`, similarly to how `TriggerContext` 
is an inner class of `Trigger`.


> Change StreamSource to use Processing-Time Clock Service
> 
>
> Key: FLINK-3647
> URL: https://issues.apache.org/jira/browse/FLINK-3647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> Currently, the {{StreamSource.AutomaticWatermarkContext}} has it's own timer 
> service. This should be changed to use the Clock service introduced in 
> FLINK-3646 to make watermark emission testable by providing a custom Clock.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3647) Change StreamSource to use Processing-Time Clock Service

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346356#comment-15346356
 ] 

ASF GitHub Bot commented on FLINK-3647:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2124#discussion_r68226912
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 ---
@@ -186,16 +199,19 @@ public DistributedCache getDistributedCache() {
public  ReducingState getReducingState(ReducingStateDescriptor 
stateProperties) {
throw new UnsupportedOperationException();
}
-   
+
+   public long getCurrentProcessingTime() {
+   Preconditions.checkNotNull(timerService, "The processing time 
timer has not been initialized.");
+   return timerService.getCurrentProcessingTime();
+   }
+
@Override
public ScheduledFuture registerTimer(final long time, final 
Triggerable target) {
-   if (timer == null) {
-   timer = Executors.newSingleThreadScheduledExecutor();
-   }
+   Preconditions.checkNotNull(timerService, "The processing time 
timer has not been initialized.");

-   final long delay = Math.max(time - System.currentTimeMillis(), 
0);
+   final long delay = Math.max(time - 
timerService.getCurrentProcessingTime(), 0);
--- End diff --

`timerService.registerTimer()` takes an absolute timestamp not a delay now.


> Change StreamSource to use Processing-Time Clock Service
> 
>
> Key: FLINK-3647
> URL: https://issues.apache.org/jira/browse/FLINK-3647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> Currently, the {{StreamSource.AutomaticWatermarkContext}} has it's own timer 
> service. This should be changed to use the Clock service introduced in 
> FLINK-3646 to make watermark emission testable by providing a custom Clock.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

2016-06-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2124#discussion_r68227049
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * A context provided to the {@link WindowAssigner} that allows it to 
query the
+ * current processing time. This is provided to the assigner by its 
containing
+ * {@link 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
+ * which, in turn, gets it from the containing
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
+ */
+public abstract class WindowAssignerContext {
--- End diff --

This should be moved to `WindowAssigner`, similarly to how `TriggerContext` 
is an inner class of `Trigger`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

2016-06-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2124#discussion_r68226912
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 ---
@@ -186,16 +199,19 @@ public DistributedCache getDistributedCache() {
public  ReducingState getReducingState(ReducingStateDescriptor 
stateProperties) {
throw new UnsupportedOperationException();
}
-   
+
+   public long getCurrentProcessingTime() {
+   Preconditions.checkNotNull(timerService, "The processing time 
timer has not been initialized.");
+   return timerService.getCurrentProcessingTime();
+   }
+
@Override
public ScheduledFuture registerTimer(final long time, final 
Triggerable target) {
-   if (timer == null) {
-   timer = Executors.newSingleThreadScheduledExecutor();
-   }
+   Preconditions.checkNotNull(timerService, "The processing time 
timer has not been initialized.");

-   final long delay = Math.max(time - System.currentTimeMillis(), 
0);
+   final long delay = Math.max(time - 
timerService.getCurrentProcessingTime(), 0);
--- End diff --

`timerService.registerTimer()` takes an absolute timestamp not a delay now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346340#comment-15346340
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2110
  
@tillrohrmann I pushed a commit that removes the per-operator object-reuse 
setting, refactors broadcasting and direct outputs and changes the ITCase to a 
test case. Happy reviewing.  


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2110: [FLINK-3974] Fix object reuse with multi-chaining

2016-06-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2110
  
@tillrohrmann I pushed a commit that removes the per-operator object-reuse 
setting, refactors broadcasting and direct outputs and changes the ITCase to a 
test case. Happy reviewing. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4046) Failing a restarting job can get stuck in JobStatus.FAILING

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346325#comment-15346325
 ] 

ASF GitHub Bot commented on FLINK-4046:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2095
  
Agreed. I had the same thought regarding checking for any final state right 
after posting this. ;) +1 to merge.


> Failing a restarting job can get stuck in JobStatus.FAILING
> ---
>
> Key: FLINK-4046
> URL: https://issues.apache.org/jira/browse/FLINK-4046
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> When a job is in state {{RESTARTING}}, then it can happen that all of its 
> {{ExecutionJobVertices}} are in a final state (if they have not been reset). 
> When calling {{fail}} on this {{ExecutionGraph}} will transition the state to 
> {{FAILING}} and call cancel on all {{ExecutionJobVertices}}. The job state 
> {{FAILING}} can only be left iff all {{ExecutionJobVertices}} have reached a 
> final state. The notification of this final state is only sent to the 
> {{ExecutionGraph}} when all subtasks of an {{ExecutionJobVertex}} have 
> transitioned to a final state. However, this won't happen because the 
> {{ExeuctionJobVertices}} are already in a final state. The result is that a 
> job can get stuck in the state {{FAILING}} if {{fail}} is called on a 
> {{RESTARTING}} job.
> I propose to add a direct transition from {{RESTARTING}} to {{FAILED}} as it 
> is the case for the {{cancel}} call (transition from {{RESTARTING}} to 
> {{CANCELED}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2095: [FLINK-4046] [runtime] Add direct state transition from R...

2016-06-23 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2095
  
Agreed. I had the same thought regarding checking for any final state right 
after posting this. ;) +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4111) Flink Table & SQL doesn't work in very simple example

2016-06-23 Thread Jark Wu (JIRA)
Jark Wu created FLINK-4111:
--

 Summary: Flink Table & SQL doesn't work in very simple example
 Key: FLINK-4111
 URL: https://issues.apache.org/jira/browse/FLINK-4111
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Jark Wu
 Fix For: 1.1.0


I’m trying to use Flink Table 1.1-SNAPSHOT where I want to use Table API and 
SQL in my project. But when I run the very simple example WordCountTable, I 
encountered the following exception : 

{code}
Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.calcite.rel.logical.LogicalAggregate.getGroupSets()Lorg/apache/flink/shaded/com/google/common/collect/ImmutableList;
  at 
org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule.matches(DataSetAggregateRule.scala:47)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:269)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:253)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:1542)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1817)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:1038)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1058)
  at 
org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:723)
  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:331)
  at 
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:250)
  at 
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
  at 
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
  at com.alibaba.flink.examples.WordCountTable$.main(WordCountTable.scala:43)
  at com.alibaba.flink.examples.WordCountTable.main(WordCountTable.scala)
{code}

It seems that something  wrong with our guava shade. Do you have any ideas? 

My pom file and WordCountTable.scala are 
[here|https://gist.github.com/wuchong/9c1c0df3cb7453502abc4605f5347289]. 

And I found someone have the same problem on stack overflow 
[http://stackoverflow.com/questions/37835408/org-apache-flink-api-table-tableexception-alias-on-field-reference-expression-e#comment63160086_37838816]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346322#comment-15346322
 ] 

ASF GitHub Bot commented on FLINK-3800:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2096#discussion_r68222088
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1029,21 +1061,25 @@ else if (current == JobStatus.CANCELLING) {
}
}
else if (current == JobStatus.FAILING) {
-   if 
(restartStrategy.canRestart() && transitionState(current, 
JobStatus.RESTARTING)) {
-   // double check in case 
that in the meantime a SuppressRestartsException was thrown
-   if 
(restartStrategy.canRestart()) {
-   
restartStrategy.restart(this);
-   break;
-   } else {
-   fail(new 
Exception("ExecutionGraph went into RESTARTING state but " +
-   "then 
the restart strategy was disabled."));
-   }
-
-   } else if 
(!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, 
failureCause)) {
+   boolean allowRestart = 
!(failureCause instanceof SuppressRestartsException);
+
+   if (allowRestart && 
restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) 
{
+   
restartStrategy.restart(this);
+   break;
+   } else if ((!allowRestart || 
!restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, 
failureCause)) {
postRunCleanup();
break;
}
}
+   else if (current == 
JobStatus.SUSPENDED) {
+   // we've already cleaned up 
when entering the SUSPENDED state
+   break;
+   }
+   else if 
(current.isGloballyTerminalState()) {
+   LOG.warn("Job has entered 
globally terminal state without waiting for all " +
+   "job vertices to reach 
final state.");
+   break;
+   }
else {
fail(new 
Exception("ExecutionGraph went into final state from state " + current));
--- End diff --

Yes you're right. The `break` is missing here. Will add it.


> ExecutionGraphs can become orphans
> --
>
> Key: FLINK-3800
> URL: https://issues.apache.org/jira/browse/FLINK-3800
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{JobManager.cancelAndClearEverything}} method fails all currently 
> executed jobs on the {{JobManager}} and then clears the list of 
> {{currentJobs}} kept in the JobManager. This can become problematic if the 
> user has set a restart strategy for a job, because the {{RestartStrategy}} 
> will try to restart the job. This can lead to unwanted re-deployments of the 
> job which consumes resources and thus will trouble the execution of other 
> jobs. If the restart strategy never stops, then this prevents that the 
> {{ExecutionGraph}} from ever being properly terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2096#discussion_r68222088
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1029,21 +1061,25 @@ else if (current == JobStatus.CANCELLING) {
}
}
else if (current == JobStatus.FAILING) {
-   if 
(restartStrategy.canRestart() && transitionState(current, 
JobStatus.RESTARTING)) {
-   // double check in case 
that in the meantime a SuppressRestartsException was thrown
-   if 
(restartStrategy.canRestart()) {
-   
restartStrategy.restart(this);
-   break;
-   } else {
-   fail(new 
Exception("ExecutionGraph went into RESTARTING state but " +
-   "then 
the restart strategy was disabled."));
-   }
-
-   } else if 
(!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, 
failureCause)) {
+   boolean allowRestart = 
!(failureCause instanceof SuppressRestartsException);
+
+   if (allowRestart && 
restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) 
{
+   
restartStrategy.restart(this);
+   break;
+   } else if ((!allowRestart || 
!restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, 
failureCause)) {
postRunCleanup();
break;
}
}
+   else if (current == 
JobStatus.SUSPENDED) {
+   // we've already cleaned up 
when entering the SUSPENDED state
+   break;
+   }
+   else if 
(current.isGloballyTerminalState()) {
+   LOG.warn("Job has entered 
globally terminal state without waiting for all " +
+   "job vertices to reach 
final state.");
+   break;
+   }
else {
fail(new 
Exception("ExecutionGraph went into final state from state " + current));
--- End diff --

Yes you're right. The `break` is missing here. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346319#comment-15346319
 ] 

ASF GitHub Bot commented on FLINK-3800:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2096#discussion_r68221848
  
--- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
@@ -16,7 +16,7 @@
 # limitations under the License.
 

 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
--- End diff --

Yes, of course. Will fix it.


> ExecutionGraphs can become orphans
> --
>
> Key: FLINK-3800
> URL: https://issues.apache.org/jira/browse/FLINK-3800
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{JobManager.cancelAndClearEverything}} method fails all currently 
> executed jobs on the {{JobManager}} and then clears the list of 
> {{currentJobs}} kept in the JobManager. This can become problematic if the 
> user has set a restart strategy for a job, because the {{RestartStrategy}} 
> will try to restart the job. This can lead to unwanted re-deployments of the 
> job which consumes resources and thus will trouble the execution of other 
> jobs. If the restart strategy never stops, then this prevents that the 
> {{ExecutionGraph}} from ever being properly terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3800) ExecutionGraphs can become orphans

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346318#comment-15346318
 ] 

ASF GitHub Bot commented on FLINK-3800:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2096#discussion_r68221836
  
--- Diff: docs/internals/job_scheduling.md ---
@@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains 
the {% gh_link /flink-run
 
 
 
-During its execution, each parallel task goes through multiple stages, 
from *created* to *finished* or *failed*. The diagram below illustrates the 
+Each ExecutionGraph has a job status associated with it.
+This job status indicates the current state of the job execution.
+
+A Flink job is first in the *created* state, then switches to *running* 
and upon completion of all work it switches to *finished*.
+In case of failures, a job switches first to *failing* where it cancels 
all running tasks.
+If all job vertices have reached a final state and the job is not 
restartable, then the job transitions to *failed*.
+If the job can be restarted, then it will enter the *restarting* state.
+Once the job has been completely restarted, it will reach the *created* 
state.
+
+In case that the user cancels the job, it will go into the *cancelling* 
state.
+This is also entails the cancellation of all currently running tasks.
--- End diff --

Thanks for spotting :-)


> ExecutionGraphs can become orphans
> --
>
> Key: FLINK-3800
> URL: https://issues.apache.org/jira/browse/FLINK-3800
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{JobManager.cancelAndClearEverything}} method fails all currently 
> executed jobs on the {{JobManager}} and then clears the list of 
> {{currentJobs}} kept in the JobManager. This can become problematic if the 
> user has set a restart strategy for a job, because the {{RestartStrategy}} 
> will try to restart the job. This can lead to unwanted re-deployments of the 
> job which consumes resources and thus will trouble the execution of other 
> jobs. If the restart strategy never stops, then this prevents that the 
> {{ExecutionGraph}} from ever being properly terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2096#discussion_r68221836
  
--- Diff: docs/internals/job_scheduling.md ---
@@ -74,7 +74,28 @@ Besides the vertices, the ExecutionGraph also contains 
the {% gh_link /flink-run
 
 
 
-During its execution, each parallel task goes through multiple stages, 
from *created* to *finished* or *failed*. The diagram below illustrates the 
+Each ExecutionGraph has a job status associated with it.
+This job status indicates the current state of the job execution.
+
+A Flink job is first in the *created* state, then switches to *running* 
and upon completion of all work it switches to *finished*.
+In case of failures, a job switches first to *failing* where it cancels 
all running tasks.
+If all job vertices have reached a final state and the job is not 
restartable, then the job transitions to *failed*.
+If the job can be restarted, then it will enter the *restarting* state.
+Once the job has been completely restarted, it will reach the *created* 
state.
+
+In case that the user cancels the job, it will go into the *cancelling* 
state.
+This is also entails the cancellation of all currently running tasks.
--- End diff --

Thanks for spotting :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2096: [FLINK-3800] [runtime] Introduce SUSPENDED job sta...

2016-06-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2096#discussion_r68221848
  
--- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
@@ -16,7 +16,7 @@
 # limitations under the License.
 

 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
--- End diff --

Yes, of course. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2095: [FLINK-4046] [runtime] Add direct state transition...

2016-06-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2095


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4046) Failing a restarting job can get stuck in JobStatus.FAILING

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346317#comment-15346317
 ] 

ASF GitHub Bot commented on FLINK-4046:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2095


> Failing a restarting job can get stuck in JobStatus.FAILING
> ---
>
> Key: FLINK-4046
> URL: https://issues.apache.org/jira/browse/FLINK-4046
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>
> When a job is in state {{RESTARTING}}, then it can happen that all of its 
> {{ExecutionJobVertices}} are in a final state (if they have not been reset). 
> When calling {{fail}} on this {{ExecutionGraph}} will transition the state to 
> {{FAILING}} and call cancel on all {{ExecutionJobVertices}}. The job state 
> {{FAILING}} can only be left iff all {{ExecutionJobVertices}} have reached a 
> final state. The notification of this final state is only sent to the 
> {{ExecutionGraph}} when all subtasks of an {{ExecutionJobVertex}} have 
> transitioned to a final state. However, this won't happen because the 
> {{ExeuctionJobVertices}} are already in a final state. The result is that a 
> job can get stuck in the state {{FAILING}} if {{fail}} is called on a 
> {{RESTARTING}} job.
> I propose to add a direct transition from {{RESTARTING}} to {{FAILED}} as it 
> is the case for the {{cancel}} call (transition from {{RESTARTING}} to 
> {{CANCELED}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >