[GitHub] flink issue #2081: [FLINK-4020][streaming-connectors] Move shard list queryi...

2016-06-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2081
  
Hi @rmetzger,
Thanks for letting me know. However, I'd like to close this PR for now for 
the following reasons:

1. The new shard-to-subtask assignment logic introduced with this change 
will actually need to be moved again to run() as part of implementing Kinesis 
reshard handling [FLINK-3231](https://issues.apache.org/jira/browse/FLINK-3231).
2. I've testing this change a bit more on Kinesis streams with high shard 
counts, and it seems like the implementation needs more guarantee on that all 
subtasks will be able to get the shard list without failing with Amazon's 
LimitExceededException even after 3 retries. Since the implementation for 
FLINK-3231 will have a separate thread that polls for changes in the shard 
list, I'd like to strengthen this guarantee as part of FLINK-3231's PR.

I'm almost done with FLINK-3231, and will reopen a PR to resolve FLINK-3231 
and FLINK-4020 together. I'll keep you updated!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328943#comment-15328943
 ] 

ASF GitHub Bot commented on FLINK-4020:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2081
  
Hi @rmetzger,
Thanks for letting me know. However, I'd like to close this PR for now for 
the following reasons:

1. The new shard-to-subtask assignment logic introduced with this change 
will actually need to be moved again to run() as part of implementing Kinesis 
reshard handling [FLINK-3231](https://issues.apache.org/jira/browse/FLINK-3231).
2. I've testing this change a bit more on Kinesis streams with high shard 
counts, and it seems like the implementation needs more guarantee on that all 
subtasks will be able to get the shard list without failing with Amazon's 
LimitExceededException even after 3 retries. Since the implementation for 
FLINK-3231 will have a separate thread that polls for changes in the shard 
list, I'd like to strengthen this guarantee as part of FLINK-3231's PR.

I'm almost done with FLINK-3231, and will reopen a PR to resolve FLINK-3231 
and FLINK-4020 together. I'll keep you updated!


> Remove shard list querying from Kinesis consumer constructor
> 
>
> Key: FLINK-4020
> URL: https://issues.apache.org/jira/browse/FLINK-4020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Currently FlinkKinesisConsumer is querying for the whole list of shards in 
> the constructor, forcing the client to be able to access Kinesis as well. 
> This is also a drawback for handling Kinesis-side resharding, since we'd want 
> all shard listing / shard-to-task assigning / shard end (result of 
> resharding) handling logic to be capable of being independently done within 
> task life cycle methods, with defined and definite results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328918#comment-15328918
 ] 

ASF GitHub Bot commented on FLINK-3857:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @sbcd90,
Sorry for the late reply, as I'm currently busy some other things. I'll be 
happy to help review again within the next 2~3 days.


> Add reconnect attempt to Elasticsearch host
> ---
>
> Key: FLINK-3857
> URL: https://issues.apache.org/jira/browse/FLINK-3857
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Subhobrata Dey
>
> Currently, the connection to the Elasticsearch host is opened in 
> {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a 
> changed DNS entry), the sink fails.
> I propose to catch the Exception for lost connections in the {{invoke()}} 
> method and try to re-open the connection for a configurable number of times 
> with a certain delay.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...

2016-06-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1962
  
Hi @sbcd90,
Sorry for the late reply, as I'm currently busy some other things. I'll be 
happy to help review again within the next 2~3 days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4069 at 6/14/16 2:50 AM:
-

Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Would you be ok with tracking this issue on FLINK-4023 and close this as a 
duplicate issue? I've referenced a link to this JIRA on FLINK-4023.


was (Author: tzulitai):
Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 and close this as a duplicate issue :) 
I've referenced a link to this JIRA on FLINK-4023.

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4069 at 6/14/16 2:50 AM:
-

Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 and close this as a duplicate issue :) 
I've referenced a link to this JIRA on FLINK-4023.


was (Author: tzulitai):
Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 on close this as a duplicate issue :) I've 
referenced a link to this JIRA on FLINK-4023.

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4069:


Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 on close this as a duplicate issue :) I've 
referenced a link to this JIRA on FLINK-4023.

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4023) Move Kafka consumer partition discovery from constructor to open()

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328831#comment-15328831
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4023:


https://issues.apache.org/jira/browse/FLINK-4069
The above is a duplicate issue for this JIRA opened after this one. I'm 
referencing the link here since the description in FLINK-4069 covers additional 
information on the problem, as well as link to a related discussion thread on 
mailing list.

> Move Kafka consumer partition discovery from constructor to open()
> --
>
> Key: FLINK-4023
> URL: https://issues.apache.org/jira/browse/FLINK-4023
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Michal Harish
>Priority: Minor
>  Labels: kafka, kafka-0.8
>
> Currently, Flink queries Kafka for partition information when creating the 
> Kafka consumer. This is done on the client when submitting the Flink job, 
> which requires the client to be able to fetch the partition data from Kafka 
> which may only be accessible from the cluster environment where the tasks 
> will be running. Moving the partition discovery to the open() method should 
> solve this problem.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug

2016-06-13 Thread dengchangfu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328758#comment-15328758
 ] 

dengchangfu commented on FLINK-4061:


In china. flink can be integrated with traditional relational database is very 
important.If flink can do,it will quickly make up for gaps in the market in 
batch and streaming. I've been using for almost two years Spark.But it does not 
really meet the real-time. so i want to try flink.Thanks

>  about flink jdbc connect oracle db exists a crital bug
> ---
>
> Key: FLINK-4061
> URL: https://issues.apache.org/jira/browse/FLINK-4061
> Project: Flink
>  Issue Type: Bug
>  Components: Batch
>Affects Versions: 1.1.0
> Environment: ubuntu ,jdk1.8.0  ,Start a Local Flink Cluster
>Reporter: dengchangfu
>Priority: Blocker
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I use flink-jdbc to connect oracle db for etl, so i write a demo to test the 
> feature. the code is simple,but after I submit this app ,a exception happen.
> exception info like this:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> my code like this:
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
> import org.apache.flink.api.table.Row;
> import org.apache.flink.api.table.typeutils.RowTypeInfo;
> import 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
> import java.sql.ResultSet;
> import java.sql.Types;
> /**
>  * Skeleton for a Flink Job.
>  *
>  * For a full example of a Flink Job, see the WordCountJob.java file in the
>  * same package/directory or have a look at the website.
>  *
>  * You can also generate a .jar file that you can submit on your Flink
>  * cluster.
>  * Just type
>  *mvn clean package
>  * in the projects root directory.
>  * You will find the jar in
>  *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
>  *
>  */
> public class Job {
> public static final TypeInformation[] fieldTypes = new 
> TypeInformation[]{
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.FLOAT_TYPE_INFO
> };
> public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
> public static void main(String[] args) {
> // set up the execution environment
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> JDBCInputFormatBuilder inputBuilder = 
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("select CLIENT_ID,OCCUR_BALANCE from 
> HS_ASSET.FUNDJOUR@OTC")
> .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
> .setRowTypeInfo(rowTypeInfo);
> DataSet source = env.createInput(inputBuilder.finish());
> source.output(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("insert into dengabc (client_id,salary) 
> values(?,?)")
> .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE})
> .finish());
> //source.print();
> //source.first(20).print();
> //dbData.print();
> /**
>  * Here, you can start creating your execution plan for Flink.
>  *
>  * Start with getting some data from the environment, like
>  *env.readTextFile(textPath);
>  *
>  * then, transform the resulting DataSet using operations
>  * like
>  *.filter()
>  *.flatMap()
>  *.join()
>  *.coGroup()
>  * and many more.
>  * Have a look at the programming guide for the Java API:
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/index.html
>  *
>  * and the examples
>  *
>  * 

[jira] [Commented] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug

2016-06-13 Thread dengchangfu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328749#comment-15328749
 ] 

dengchangfu commented on FLINK-4061:


I also think this is the problem,the statement was not inited.but if I want to 
use the flink-jdbc to connect oracle,how to do? I have try to use 1.0.3(the 
latest),there are some other exception,so I clone the source code to 
compile,then use the jar(1.1 snapshot)

>  about flink jdbc connect oracle db exists a crital bug
> ---
>
> Key: FLINK-4061
> URL: https://issues.apache.org/jira/browse/FLINK-4061
> Project: Flink
>  Issue Type: Bug
>  Components: Batch
>Affects Versions: 1.1.0
> Environment: ubuntu ,jdk1.8.0  ,Start a Local Flink Cluster
>Reporter: dengchangfu
>Priority: Blocker
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I use flink-jdbc to connect oracle db for etl, so i write a demo to test the 
> feature. the code is simple,but after I submit this app ,a exception happen.
> exception info like this:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> my code like this:
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
> import org.apache.flink.api.table.Row;
> import org.apache.flink.api.table.typeutils.RowTypeInfo;
> import 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
> import java.sql.ResultSet;
> import java.sql.Types;
> /**
>  * Skeleton for a Flink Job.
>  *
>  * For a full example of a Flink Job, see the WordCountJob.java file in the
>  * same package/directory or have a look at the website.
>  *
>  * You can also generate a .jar file that you can submit on your Flink
>  * cluster.
>  * Just type
>  *mvn clean package
>  * in the projects root directory.
>  * You will find the jar in
>  *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
>  *
>  */
> public class Job {
> public static final TypeInformation[] fieldTypes = new 
> TypeInformation[]{
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.FLOAT_TYPE_INFO
> };
> public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
> public static void main(String[] args) {
> // set up the execution environment
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> JDBCInputFormatBuilder inputBuilder = 
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("select CLIENT_ID,OCCUR_BALANCE from 
> HS_ASSET.FUNDJOUR@OTC")
> .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
> .setRowTypeInfo(rowTypeInfo);
> DataSet source = env.createInput(inputBuilder.finish());
> source.output(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("insert into dengabc (client_id,salary) 
> values(?,?)")
> .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE})
> .finish());
> //source.print();
> //source.first(20).print();
> //dbData.print();
> /**
>  * Here, you can start creating your execution plan for Flink.
>  *
>  * Start with getting some data from the environment, like
>  *env.readTextFile(textPath);
>  *
>  * then, transform the resulting DataSet using operations
>  * like
>  *.filter()
>  *.flatMap()
>  *.join()
>  *.coGroup()
>  * and many more.
>  * Have a look at the programming guide for the Java API:
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/index.html
>  *
>  * and the examples
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/examples.html
>

[jira] [Commented] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug

2016-06-13 Thread dengchangfu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328742#comment-15328742
 ] 

dengchangfu commented on FLINK-4061:


I have depend oracle jar in pom,but it is really a problem.User-friendly is 
really important for open source,thank you reply

>  about flink jdbc connect oracle db exists a crital bug
> ---
>
> Key: FLINK-4061
> URL: https://issues.apache.org/jira/browse/FLINK-4061
> Project: Flink
>  Issue Type: Bug
>  Components: Batch
>Affects Versions: 1.1.0
> Environment: ubuntu ,jdk1.8.0  ,Start a Local Flink Cluster
>Reporter: dengchangfu
>Priority: Blocker
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I use flink-jdbc to connect oracle db for etl, so i write a demo to test the 
> feature. the code is simple,but after I submit this app ,a exception happen.
> exception info like this:
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> my code like this:
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
> import org.apache.flink.api.table.Row;
> import org.apache.flink.api.table.typeutils.RowTypeInfo;
> import 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
> import java.sql.ResultSet;
> import java.sql.Types;
> /**
>  * Skeleton for a Flink Job.
>  *
>  * For a full example of a Flink Job, see the WordCountJob.java file in the
>  * same package/directory or have a look at the website.
>  *
>  * You can also generate a .jar file that you can submit on your Flink
>  * cluster.
>  * Just type
>  *mvn clean package
>  * in the projects root directory.
>  * You will find the jar in
>  *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
>  *
>  */
> public class Job {
> public static final TypeInformation[] fieldTypes = new 
> TypeInformation[]{
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.FLOAT_TYPE_INFO
> };
> public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
> public static void main(String[] args) {
> // set up the execution environment
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> JDBCInputFormatBuilder inputBuilder = 
> JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("select CLIENT_ID,OCCUR_BALANCE from 
> HS_ASSET.FUNDJOUR@OTC")
> .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
> .setRowTypeInfo(rowTypeInfo);
> DataSet source = env.createInput(inputBuilder.finish());
> source.output(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("insert into dengabc (client_id,salary) 
> values(?,?)")
> .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE})
> .finish());
> //source.print();
> //source.first(20).print();
> //dbData.print();
> /**
>  * Here, you can start creating your execution plan for Flink.
>  *
>  * Start with getting some data from the environment, like
>  *env.readTextFile(textPath);
>  *
>  * then, transform the resulting DataSet using operations
>  * like
>  *.filter()
>  *.flatMap()
>  *.join()
>  *.coGroup()
>  * and many more.
>  * Have a look at the programming guide for the Java API:
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/index.html
>  *
>  * and the examples
>  *
>  * http://flink.apache.org/docs/latest/apis/batch/examples.html
>  *
>  */
> // execute program
> try {
> env.execute("Flink Java API Skeleton");
> } catch 

[jira] [Commented] (FLINK-3971) Aggregates handle null values incorrectly.

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328667#comment-15328667
 ] 

ASF GitHub Bot commented on FLINK-3971:
---

Github user gallenvara commented on the issue:

https://github.com/apache/flink/pull/2049
  
@fhueske Done.


> Aggregates handle null values incorrectly.
> --
>
> Key: FLINK-3971
> URL: https://issues.apache.org/jira/browse/FLINK-3971
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Critical
> Fix For: 1.1.0
>
>
> Table API and SQL aggregates are supposed to ignore null values, e.g., 
> {{sum(1,2,null,4)}} is supposed to return {{7}}. 
> There current implementation is correct if at least one valid value is 
> present however, is incorrect if only null values are aggregated. {{sum(null, 
> null, null)}} should return {{null}} instead of {{0}}
> Currently only the Count aggregate handles the case of null-values-only 
> correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

2016-06-13 Thread gallenvara
Github user gallenvara commented on the issue:

https://github.com/apache/flink/pull/2049
  
@fhueske Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328612#comment-15328612
 ] 

ASF GitHub Bot commented on FLINK-3921:
---

Github user rekhajoshm commented on the issue:

https://github.com/apache/flink/pull/2060
  
Sorry for delay, got busy. 
Thank you @StephanEwen for your review.updated.Please have a look.thanks cc 
@greghogan 


> StringParser not specifying encoding to use
> ---
>
> Key: FLINK-3921
> URL: https://issues.apache.org/jira/browse/FLINK-3921
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.3
>Reporter: Tatu Saloranta
>Assignee: Rekha Joshi
>Priority: Trivial
>
> Class `flink.types.parser.StringParser` has javadocs indicating that contents 
> are expected to be Ascii, similar to `StringValueParser`. That makes sense, 
> but when constructing actual instance, no encoding is specified; on line 66 
> f.ex:
>this.result = new String(bytes, startPos+1, i - startPos - 2);
> which leads to using whatever default platform encoding is. If contents 
> really are always Ascii (would not count on that as parser is used from CSV 
> reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues.
> So I think that encoding should be explicitly specified, whatever is to be 
> used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 
> or even ISO-8859-1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding

2016-06-13 Thread rekhajoshm
Github user rekhajoshm commented on the issue:

https://github.com/apache/flink/pull/2060
  
Sorry for delay, got busy. 
Thank you @StephanEwen for your review.updated.Please have a look.thanks cc 
@greghogan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-4069:


 Summary: Kafka Consumer should not initialize on construction
 Key: FLINK-4069
 URL: https://issues.apache.org/jira/browse/FLINK-4069
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.0.3
Reporter: Shannon Carey


The Kafka Consumer connector currently interacts over the network with Kafka in 
order to get partition metadata when the class is constructed. Instead, it 
should do that work when the job actually begins to run (for example, in 
AbstractRichFunction#open() of FlinkKafkaConsumer0?).

The main weakness of broker querying in the constructor is that if there are 
network problems, Flink might take a long time (eg. ~1hr) inside the 
user-supplied main() method while it attempts to contact each broker and 
perform retries. In general, setting up the Kafka partitions does not seem 
strictly necessary as part of execution of main() in order to set up the job 
plan/topology.

However, as Robert Metzger mentions, there are important concerns with how 
Kafka partitions are handled:

"The main reason why we do the querying centrally is:
a) avoid overloading the brokers
b) send the same list of partitions (in the same order) to all parallel 
consumers to do a fixed partition assignments (also across restarts). When we 
do the querying in the open() method, we need to make sure that all partitions 
are assigned, without duplicates (also after restarts in case of failures)."

See also the mailing list discussion: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3340) Fix object juggling in drivers

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328014#comment-15328014
 ] 

ASF GitHub Bot commented on FLINK-3340:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/1626
  
Thank you @xhumanoid for reporting this! I will make the fix.


> Fix object juggling in drivers
> --
>
> Key: FLINK-3340
> URL: https://issues.apache.org/jira/browse/FLINK-3340
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.0.0
>
>
> {{ReduceDriver}}, {{ReduceCombineDriver}}, and {{ChainedAllReduceDriver}} are 
> not properly tracking objects for reuse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1626: [FLINK-3340] [runtime] Fix object juggling in drivers

2016-06-13 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/1626
  
Thank you @xhumanoid for reporting this! I will make the fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3323) Add documentation for NiFi connector

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327932#comment-15327932
 ] 

ASF GitHub Bot commented on FLINK-3323:
---

GitHub user smarthi opened a pull request:

https://github.com/apache/flink/pull/2099

Flink 3323: Add documentation for NiFi connector

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-3323] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smarthi/flink FLINK-3323

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2099.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2099


commit f9a2e54cba178f1931528fb31638c0e72705b7b4
Author: smarthi 
Date:   2016-05-30T13:04:25Z

Initial Commit of nifi docs

commit db1d8628f103bc7d319db09a151bb555e48694f9
Author: smarthi 
Date:   2016-05-31T11:24:11Z

Initial Commit of nifi docs

commit 878c74254abac1cb62d1ebef822b8c4f31cd48a2
Author: smarthi 
Date:   2016-06-13T18:43:43Z

FLINK-3323: Add documentation for NiFi connector




> Add documentation for NiFi connector
> 
>
> Key: FLINK-3323
> URL: https://issues.apache.org/jira/browse/FLINK-3323
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Matthias J. Sax
>Assignee: Suneel Marthi
>Priority: Minor
>
> Add Nifi-connector documentation to the "Data Stream / Connectors" web page 
> docs. See also FLINK-3324



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3323) Add documentation for NiFi connector

2016-06-13 Thread Suneel Marthi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suneel Marthi updated FLINK-3323:
-
Summary: Add documentation for NiFi connector  (was: Nifi connector not 
documented)

> Add documentation for NiFi connector
> 
>
> Key: FLINK-3323
> URL: https://issues.apache.org/jira/browse/FLINK-3323
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Matthias J. Sax
>Assignee: Suneel Marthi
>Priority: Minor
>
> Add Nifi-connector documentation to the "Data Stream / Connectors" web page 
> docs. See also FLINK-3324



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4066) RabbitMQ source, customize queue arguments

2016-06-13 Thread Kanstantsin Kamkou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kanstantsin Kamkou closed FLINK-4066.
-

> RabbitMQ source, customize queue arguments
> --
>
> Key: FLINK-4066
> URL: https://issues.apache.org/jira/browse/FLINK-4066
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kanstantsin Kamkou
>Priority: Minor
>  Labels: queue, rabbitmq
> Fix For: 1.1.0
>
>
> Please, add a functionality to customize the line (custom attributes for a 
> queue).
> {code}channel.queueDeclare(queueName, false, false, false, null);{code}
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327788#comment-15327788
 ] 

Robert Metzger commented on FLINK-4051:
---

Great, Thank you!

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4045) Test AMQP connector using Apache ActiveMQ emebdded broker

2016-06-13 Thread Subhankar Biswas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Subhankar Biswas reassigned FLINK-4045:
---

Assignee: Subhankar Biswas

> Test AMQP connector using Apache ActiveMQ emebdded broker
> -
>
> Key: FLINK-4045
> URL: https://issues.apache.org/jira/browse/FLINK-4045
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> Currently, our (RabbitMQ) AMQP client is not tested in any integration tests.
> Apache ActiveMQ implements an AMQP broker and they provide an embedded 
> implementation we can use for the integration tests: 
> http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-13 Thread Subhankar Biswas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327758#comment-15327758
 ] 

Subhankar Biswas commented on FLINK-4051:
-

OK i'll start working on it.

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327693#comment-15327693
 ] 

ASF GitHub Bot commented on FLINK-3944:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/2098

[revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder 
Cartesian products and joins."

This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853.
Added rewrite rules blow up search space which cannot be effectively pruned 
without cardinality estimates.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableCrossRevert

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2098


commit df438a9fe5e9fcd191c371a888056c49aa246992
Author: Fabian Hueske 
Date:   2016-06-13T16:18:42Z

[revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder 
Cartesian products and joins."

This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853.
Added rewrite rules blow up search space which cannot be effectively pruned 
without cardinality estimates.




> Add optimization rules to reorder Cartesian products and joins
> --
>
> Key: FLINK-3944
> URL: https://issues.apache.org/jira/browse/FLINK-3944
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.1.0
>
>
> Currently, we do not support the execution of Cartesian products. 
> Because we do not optimize the order of joins (due to missing statistics), 
> joins are executed in the order in which they are specified. This works well 
> for the Table API, however it can be problematic in case of SQL queries where 
> the order of tables in the FROM clause should not matter.
> In case of SQL queries, it can happen that the optimized plan contains 
> Cartesian products because joins are not reordered. If we add optimization 
> rules that switch Cartesian products and joins, such situations can be 
> resolved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2098: [revert] [FLINK-3944] [tableAPI] Reverts "Add rewr...

2016-06-13 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/2098

[revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder 
Cartesian products and joins."

This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853.
Added rewrite rules blow up search space which cannot be effectively pruned 
without cardinality estimates.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableCrossRevert

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2098


commit df438a9fe5e9fcd191c371a888056c49aa246992
Author: Fabian Hueske 
Date:   2016-06-13T16:18:42Z

[revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder 
Cartesian products and joins."

This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853.
Added rewrite rules blow up search space which cannot be effectively pruned 
without cardinality estimates.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3971) Aggregates handle null values incorrectly.

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327689#comment-15327689
 ] 

ASF GitHub Bot commented on FLINK-3971:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2049
  
Hi @gallenvara, you have a merge commit in your changes.
Can you rebase your changes to the current master and squash the commits? 
Thanks!


> Aggregates handle null values incorrectly.
> --
>
> Key: FLINK-3971
> URL: https://issues.apache.org/jira/browse/FLINK-3971
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Critical
> Fix For: 1.1.0
>
>
> Table API and SQL aggregates are supposed to ignore null values, e.g., 
> {{sum(1,2,null,4)}} is supposed to return {{7}}. 
> There current implementation is correct if at least one valid value is 
> present however, is incorrect if only null values are aggregated. {{sum(null, 
> null, null)}} should return {{null}} instead of {{0}}
> Currently only the Count aggregate handles the case of null-values-only 
> correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

2016-06-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2049
  
Hi @gallenvara, you have a merge commit in your changes.
Can you rebase your changes to the current master and squash the commits? 
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins

2016-06-13 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327687#comment-15327687
 ] 

Fabian Hueske commented on FLINK-3944:
--

I will open a PR to revert the commit.

> Add optimization rules to reorder Cartesian products and joins
> --
>
> Key: FLINK-3944
> URL: https://issues.apache.org/jira/browse/FLINK-3944
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.1.0
>
>
> Currently, we do not support the execution of Cartesian products. 
> Because we do not optimize the order of joins (due to missing statistics), 
> joins are executed in the order in which they are specified. This works well 
> for the Table API, however it can be problematic in case of SQL queries where 
> the order of tables in the FROM clause should not matter.
> In case of SQL queries, it can happen that the optimized plan contains 
> Cartesian products because joins are not reordered. If we add optimization 
> rules that switch Cartesian products and joins, such situations can be 
> resolved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins

2016-06-13 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reopened FLINK-3944:
--

> Add optimization rules to reorder Cartesian products and joins
> --
>
> Key: FLINK-3944
> URL: https://issues.apache.org/jira/browse/FLINK-3944
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.1.0
>
>
> Currently, we do not support the execution of Cartesian products. 
> Because we do not optimize the order of joins (due to missing statistics), 
> joins are executed in the order in which they are specified. This works well 
> for the Table API, however it can be problematic in case of SQL queries where 
> the order of tables in the FROM clause should not matter.
> In case of SQL queries, it can happen that the optimized plan contains 
> Cartesian products because joins are not reordered. If we add optimization 
> rules that switch Cartesian products and joins, such situations can be 
> resolved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins

2016-06-13 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327666#comment-15327666
 ] 

Fabian Hueske commented on FLINK-3944:
--

I tried the fix on another query and found that the search space grew too large 
due to the added rewrite rules. On a moderately complex query and the 
optimization did not terminate. Since we do not have reliable estimates, the 
optimizer cannot effectively prune search space and the enumeration of 
alternatives takes too much time. 

I propose to revert this commit. Users will have to specify tables in an order 
that does not include Cartesian products.

> Add optimization rules to reorder Cartesian products and joins
> --
>
> Key: FLINK-3944
> URL: https://issues.apache.org/jira/browse/FLINK-3944
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.1.0
>
>
> Currently, we do not support the execution of Cartesian products. 
> Because we do not optimize the order of joins (due to missing statistics), 
> joins are executed in the order in which they are specified. This works well 
> for the Table API, however it can be problematic in case of SQL queries where 
> the order of tables in the FROM clause should not matter.
> In case of SQL queries, it can happen that the optimized plan contains 
> Cartesian products because joins are not reordered. If we add optimization 
> rules that switch Cartesian products and joins, such situations can be 
> resolved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3340) Fix object juggling in drivers

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327660#comment-15327660
 ] 

ASF GitHub Bot commented on FLINK-3340:
---

Github user xhumanoid commented on the issue:

https://github.com/apache/flink/pull/1626
  
Hi, I know than issue closed, but he maybe have small bug


https://github.com/apache/flink/pull/1626/files#diff-4a133896fec62bcabc1120b0df8cb8daR205

in hashCode recursion on this.hashCode()


> Fix object juggling in drivers
> --
>
> Key: FLINK-3340
> URL: https://issues.apache.org/jira/browse/FLINK-3340
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.0.0
>
>
> {{ReduceDriver}}, {{ReduceCombineDriver}}, and {{ChainedAllReduceDriver}} are 
> not properly tracking objects for reuse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1626: [FLINK-3340] [runtime] Fix object juggling in drivers

2016-06-13 Thread xhumanoid
Github user xhumanoid commented on the issue:

https://github.com/apache/flink/pull/1626
  
Hi, I know than issue closed, but he maybe have small bug


https://github.com/apache/flink/pull/1626/files#diff-4a133896fec62bcabc1120b0df8cb8daR205

in hashCode recursion on this.hashCode()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327635#comment-15327635
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2088
  
@fhueske Thanks for reviewing my code. I totally forgot to test the 
comparisions. I will fix the issues and see what I can do for fixed precision / 
scale cases. Would be great if you could implement the corresponding 
aggregation functions.


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327625#comment-15327625
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66814270
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 ---
@@ -143,15 +143,30 @@ object ScalarFunctions {
 new MultiTypeMethodCallGen(BuiltInMethods.ABS))
 
   addSqlFunction(
+ABS,
+Seq(BIG_DEC_TYPE_INFO),
+new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC))
+
+  addSqlFunction(
--- End diff --

`FLOAT`, `INT` etc. is supported implicitly as they `shouldAutocastTo` 
`DOUBLE`. The most suitable method is later chosen by the Java compiler. 
Calcite's `SqlFunctions` provides methods for all basic types.


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.

2016-06-13 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4068:


 Summary: Move constant computations out of code-generated 
`flatMap` functions.
 Key: FLINK-4068
 URL: https://issues.apache.org/jira/browse/FLINK-4068
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Affects Versions: 1.1.0
Reporter: Fabian Hueske


The generated functions for expressions of the Table API or SQL include 
constant computations.

For instance the code generated for a predicate like:

{code}
myInt < (10 + 20)
{code}

looks roughly like:

{code}

public void flatMap(Row in, Collector out) {

  Integer in1 = in.productElement(1);
  int temp = 10 + 20;  
  if (in1 < temp) {
out.collect(in)
  }
}
{code}

In this example the computation of {{temp}} is constant and could be moved out 
of the {{flatMap()}} method.

The same might apply for generated function other than {{FlatMap}} as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal

2016-06-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327604#comment-15327604
 ] 

Robert Metzger commented on FLINK-4051:
---

On the other hand, the Javadoc of take() states that the method is 
interruptible, and Flink is interrupting a thread if its not reacting to a 
cancel signal.
Therefore, I'm wondering if this is really an issue or not.

It seems that you are very eager to contribute to Flink. How about the 
following?
- You look into FLINK-4045 (without the renaming, just adding tests for the 
RabbitMQ connector)
- Once that is finished, we can add a specific test for RMQ source 
cancellation. Then, we can experiment with it and see how it reacts.

What do you think?

> RabbitMQ Source might not react to cancel signal
> 
>
> Key: FLINK-4051
> URL: https://issues.apache.org/jira/browse/FLINK-4051
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Subhankar Biswas
>
> As reported here 
> https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517,
>  the RabbitMQ source might block forever / ignore the cancelling signal, if 
> its listening to an empty queue.
> Fix: call nextDelivery() with a timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327591#comment-15327591
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2088
  
Think we also need aggregation functions for `DECIMAL` otherwise this won't 
work:
```
SELECT sum(myInt * 1.23) FROM MyTable
```


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-06-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327589#comment-15327589
 ] 

Robert Metzger commented on FLINK-3763:
---

Okay. I'll investigate this issue and then decide how to proceed with 
FLINK-4051.
Thank you for the report.

> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
> Fix For: 1.1.0
>
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger support to...

2016-06-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2088
  
Think we also need aggregation functions for `DECIMAL` otherwise this won't 
work:
```
SELECT sum(myInt * 1.23) FROM MyTable
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4066) RabbitMQ source, customize queue arguments

2016-06-13 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4066:
--
Fix Version/s: 1.1.0

> RabbitMQ source, customize queue arguments
> --
>
> Key: FLINK-4066
> URL: https://issues.apache.org/jira/browse/FLINK-4066
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kanstantsin Kamkou
>Priority: Minor
>  Labels: queue, rabbitmq
> Fix For: 1.1.0
>
>
> Please, add a functionality to customize the line (custom attributes for a 
> queue).
> {code}channel.queueDeclare(queueName, false, false, false, null);{code}
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4066) RabbitMQ source, customize queue arguments

2016-06-13 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4066.
---
Resolution: Duplicate

> RabbitMQ source, customize queue arguments
> --
>
> Key: FLINK-4066
> URL: https://issues.apache.org/jira/browse/FLINK-4066
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kanstantsin Kamkou
>Priority: Minor
>  Labels: queue, rabbitmq
> Fix For: 1.1.0
>
>
> Please, add a functionality to customize the line (custom attributes for a 
> queue).
> {code}channel.queueDeclare(queueName, false, false, false, null);{code}
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4066) RabbitMQ source, customize queue arguments

2016-06-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327585#comment-15327585
 ] 

Robert Metzger commented on FLINK-4066:
---

I'm closing this JIRA as invalid. The requested feature has been added in 
FLINK-4025

> RabbitMQ source, customize queue arguments
> --
>
> Key: FLINK-4066
> URL: https://issues.apache.org/jira/browse/FLINK-4066
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kanstantsin Kamkou
>Priority: Minor
>  Labels: queue, rabbitmq
> Fix For: 1.1.0
>
>
> Please, add a functionality to customize the line (custom attributes for a 
> queue).
> {code}channel.queueDeclare(queueName, false, false, false, null);{code}
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4064) Allow calling MetricGroup methods multiple times

2016-06-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327587#comment-15327587
 ] 

Aljoscha Krettek commented on FLINK-4064:
-

You're right, that might be the way to go.

> Allow calling MetricGroup methods multiple times
> 
>
> Key: FLINK-4064
> URL: https://issues.apache.org/jira/browse/FLINK-4064
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>
> Right now, the methods on {{MetricGroup}} can only be called once, i.e. I 
> have to keep the result of {{MetricGroup.counter(name)}} and use that to 
> report metrics. For some cases, such as adding metrics support in triggers, 
> it is necessary to allow calling a method multiple times and return the same 
> metric object. On the first call, a new metric object would be created, 
> subsequent calls would return that metric object.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4028) AbstractAlignedProcessingTimeWindowOperator creates wrong TimeWindow

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327584#comment-15327584
 ] 

ASF GitHub Bot commented on FLINK-4028:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2097

[FLINK-4028] Create correct TimeWindow in 
AbstractAlignedProcessingTimeWindowOperator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink fix-aligned-time-window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2097.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2097


commit 6180df6e11b7c27f62c9e25a1d7888c0b097674b
Author: Aljoscha Krettek 
Date:   2016-06-13T10:03:32Z

[FLINK-4028] Create correct TimeWindow in 
AbstractAlignedProcessingTimeWindowOperator




> AbstractAlignedProcessingTimeWindowOperator creates wrong TimeWindow
> 
>
> Key: FLINK-4028
> URL: https://issues.apache.org/jira/browse/FLINK-4028
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.1.0
>
>
> In {{computeWindow}} we have this code: {{new TimeWindow(timestamp, timestamp 
> + windowSize)}}. This is wrong because {{timestamp}} is actually the end 
> timestamp of the window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4065) Unstable Kafka09ITCase.testMultipleSourcesOnePartition test

2016-06-13 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327583#comment-15327583
 ] 

Kostas Kloudas commented on FLINK-4065:
---

Thanks [~rmetzger] 

> Unstable Kafka09ITCase.testMultipleSourcesOnePartition test
> ---
>
> Key: FLINK-4065
> URL: https://issues.apache.org/jira/browse/FLINK-4065
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Sometime the Kafka09ITCase.testMultipleSourcesOnePartition test fails on 
> travis. Here is a log of such a failure:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/136758707/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2097: [FLINK-4028] Create correct TimeWindow in Abstract...

2016-06-13 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2097

[FLINK-4028] Create correct TimeWindow in 
AbstractAlignedProcessingTimeWindowOperator



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink fix-aligned-time-window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2097.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2097


commit 6180df6e11b7c27f62c9e25a1d7888c0b097674b
Author: Aljoscha Krettek 
Date:   2016-06-13T10:03:32Z

[FLINK-4028] Create correct TimeWindow in 
AbstractAlignedProcessingTimeWindowOperator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3971) Aggregates handle null values incorrectly.

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327580#comment-15327580
 ] 

ASF GitHub Bot commented on FLINK-3971:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2049
  
Thanks for the fast update @gallenvara. Good to merge!


> Aggregates handle null values incorrectly.
> --
>
> Key: FLINK-3971
> URL: https://issues.apache.org/jira/browse/FLINK-3971
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Critical
> Fix For: 1.1.0
>
>
> Table API and SQL aggregates are supposed to ignore null values, e.g., 
> {{sum(1,2,null,4)}} is supposed to return {{7}}. 
> There current implementation is correct if at least one valid value is 
> present however, is incorrect if only null values are aggregated. {{sum(null, 
> null, null)}} should return {{null}} instead of {{0}}
> Currently only the Count aggregate handles the case of null-values-only 
> correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

2016-06-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2049
  
Thanks for the fast update @gallenvara. Good to merge!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4067) Add version header to savepoints

2016-06-13 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-4067:
--

 Summary: Add version header to savepoints
 Key: FLINK-4067
 URL: https://issues.apache.org/jira/browse/FLINK-4067
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.0.3
Reporter: Ufuk Celebi
 Fix For: 1.1.0


Adding a header with version information to savepoints ensures that we can 
migrate savepoints between Flink versions in the future (for example when 
changing internal serialization formats between versions).

After talking with Till, we propose to add the following meta data:

- Magic number (int): identify data as savepoint
- Version (int): savepoint version (independent of Flink version)
- Data Offset (int): specifies at which point the actual savepoint data starts. 
With this, we can allow future Flink versions to add fields to the header 
without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink 2.0.

For Flink 1.0 savepoint support, we have to try reading the savepoints without 
a header before failing if we don't find the magic number.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3749) Improve decimal handling

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327574#comment-15327574
 ] 

ASF GitHub Bot commented on FLINK-3749:
---

Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1881


> Improve decimal handling
> 
>
> Key: FLINK-3749
> URL: https://issues.apache.org/jira/browse/FLINK-3749
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The current decimal handling is too restrictive and does not allow literals 
> such as "11.2".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3749) Improve decimal handling

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327573#comment-15327573
 ] 

ASF GitHub Bot commented on FLINK-3749:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/1881
  
Yes, I will close this...


> Improve decimal handling
> 
>
> Key: FLINK-3749
> URL: https://issues.apache.org/jira/browse/FLINK-3749
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The current decimal handling is too restrictive and does not allow literals 
> such as "11.2".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1881: [FLINK-3749] [table] Improve decimal handling

2016-06-13 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/1881
  
Yes, I will close this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1881: [FLINK-3749] [table] Improve decimal handling

2016-06-13 Thread twalthr
Github user twalthr closed the pull request at:

https://github.com/apache/flink/pull/1881


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4065) Unstable Kafka09ITCase.testMultipleSourcesOnePartition test

2016-06-13 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger closed FLINK-4065.
-
Resolution: Invalid

> Unstable Kafka09ITCase.testMultipleSourcesOnePartition test
> ---
>
> Key: FLINK-4065
> URL: https://issues.apache.org/jira/browse/FLINK-4065
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Sometime the Kafka09ITCase.testMultipleSourcesOnePartition test fails on 
> travis. Here is a log of such a failure:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/136758707/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4065) Unstable Kafka09ITCase.testMultipleSourcesOnePartition test

2016-06-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327572#comment-15327572
 ] 

Robert Metzger commented on FLINK-4065:
---

4 days ago, I improved the Kafka tests stability a bit 
(https://issues.apache.org/jira/browse/FLINK-3530), I also changed the behavior 
of the mentioned test. From the logs it seems you've been running the tests on 
a slightly outdated master.
I'll close this issue as invalid for now. Please reopen it if the error occurs 
again.

> Unstable Kafka09ITCase.testMultipleSourcesOnePartition test
> ---
>
> Key: FLINK-4065
> URL: https://issues.apache.org/jira/browse/FLINK-4065
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Sometime the Kafka09ITCase.testMultipleSourcesOnePartition test fails on 
> travis. Here is a log of such a failure:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/136758707/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327549#comment-15327549
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2088
  
Thanks for the PR, @twalthr! Looks mostly good. I found a few cases that 
needs to be fix.

We also should think about how to handle SQL `DECIMAL` types with fixed 
precision / scale, e.g., how to handle something like `SELECT myDouble AS 
DECIMAL(4,2)`. Do you think this could be easily added with this PR or rather 
be fix in a later issue?


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger support to...

2016-06-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2088
  
Thanks for the PR, @twalthr! Looks mostly good. I found a few cases that 
needs to be fix.

We also should think about how to handle SQL `DECIMAL` types with fixed 
precision / scale, e.g., how to handle something like `SELECT myDouble AS 
DECIMAL(4,2)`. Do you think this could be easily added with this PR or rather 
be fix in a later issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...

2016-06-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66807225
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/DecimalTypeTest.scala
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expression
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.expression.utils.ExpressionTestBase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.junit.Test
+
+class DecimalTypeTest extends ExpressionTestBase {
+
+  @Test
+  def testDecimalLiterals(): Unit = {
+// implicit double
+testAllApis(
+  11.2,
+  "11.2",
+  "11.2",
+  "11.2")
+
+// implicit double
+testAllApis(
+  0.7623533651719233,
+  "0.7623533651719233",
+  "0.7623533651719233",
+  "0.7623533651719233")
+
+// explicit decimal (with precision of 19)
+testAllApis(
+  BigDecimal("1234567891234567891"),
+  "1234567891234567891p",
+  "1234567891234567891",
+  "1234567891234567891")
+
+// explicit decimal (high precision, not SQL compliant)
+testTableApi(
+  BigDecimal("123456789123456789123456789"),
+  "123456789123456789123456789p",
+  "123456789123456789123456789")
+
+// explicit decimal (high precision, not SQL compliant)
+testTableApi(
+  BigDecimal("12.3456789123456789123456789"),
+  "12.3456789123456789123456789p",
+  "12.3456789123456789123456789")
+  }
+
+  @Test
+  def testDecimalBorders(): Unit = {
+testAllApis(
+  Double.MaxValue,
+  Double.MaxValue.toString,
+  Double.MaxValue.toString,
+  Double.MaxValue.toString)
+
+testAllApis(
+  Double.MinValue,
+  Double.MinValue.toString,
+  Double.MinValue.toString,
+  Double.MinValue.toString)
+
+testAllApis(
+  Double.MinValue.cast(FLOAT_TYPE_INFO),
+  s"${Double.MinValue}.cast(FLOAT)",
+  s"CAST(${Double.MinValue} AS FLOAT)",
+  Float.NegativeInfinity.toString)
+
+testAllApis(
+  Byte.MinValue.cast(BYTE_TYPE_INFO),
+  s"(${Byte.MinValue}).cast(BYTE)",
+  s"CAST(${Byte.MinValue} AS TINYINT)",
+  Byte.MinValue.toString)
+
+testAllApis(
+  Byte.MinValue.cast(BYTE_TYPE_INFO) - 1.cast(BYTE_TYPE_INFO),
+  s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)",
+  s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)",
+  Byte.MaxValue.toString)
+
+testAllApis(
+  Short.MinValue.cast(SHORT_TYPE_INFO),
+  s"(${Short.MinValue}).cast(SHORT)",
+  s"CAST(${Short.MinValue} AS SMALLINT)",
+  Short.MinValue.toString)
+
+testAllApis(
+  Int.MinValue.cast(INT_TYPE_INFO) - 1,
+  s"(${Int.MinValue}).cast(INT) - 1",
+  s"CAST(${Int.MinValue} AS INT) - 1",
+  Int.MaxValue.toString)
+
+testAllApis(
+  Long.MinValue.cast(LONG_TYPE_INFO),
+  s"(${Long.MinValue}L).cast(LONG)",
+  s"CAST(${Long.MinValue} AS BIGINT)",
+  Long.MinValue.toString)
+  }
+
+  @Test
+  def testDecimalCasting(): Unit = {
+// from String
+testTableApi(
+  "123456789123456789123456789".cast(BIG_DEC_TYPE_INFO),
+  "'123456789123456789123456789'.cast(DECIMAL)",
+  "123456789123456789123456789")
+
+// from double
+testAllApis(
+  'f3.cast(BIG_DEC_TYPE_INFO),
+  "f3.cast(DECIMAL)",
+  "CAST(f3 AS DECIMAL)",
+  "4.2")
+
+// to double
 

[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327547#comment-15327547
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66807225
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/DecimalTypeTest.scala
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expression
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.expression.utils.ExpressionTestBase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.junit.Test
+
+class DecimalTypeTest extends ExpressionTestBase {
+
+  @Test
+  def testDecimalLiterals(): Unit = {
+// implicit double
+testAllApis(
+  11.2,
+  "11.2",
+  "11.2",
+  "11.2")
+
+// implicit double
+testAllApis(
+  0.7623533651719233,
+  "0.7623533651719233",
+  "0.7623533651719233",
+  "0.7623533651719233")
+
+// explicit decimal (with precision of 19)
+testAllApis(
+  BigDecimal("1234567891234567891"),
+  "1234567891234567891p",
+  "1234567891234567891",
+  "1234567891234567891")
+
+// explicit decimal (high precision, not SQL compliant)
+testTableApi(
+  BigDecimal("123456789123456789123456789"),
+  "123456789123456789123456789p",
+  "123456789123456789123456789")
+
+// explicit decimal (high precision, not SQL compliant)
+testTableApi(
+  BigDecimal("12.3456789123456789123456789"),
+  "12.3456789123456789123456789p",
+  "12.3456789123456789123456789")
+  }
+
+  @Test
+  def testDecimalBorders(): Unit = {
+testAllApis(
+  Double.MaxValue,
+  Double.MaxValue.toString,
+  Double.MaxValue.toString,
+  Double.MaxValue.toString)
+
+testAllApis(
+  Double.MinValue,
+  Double.MinValue.toString,
+  Double.MinValue.toString,
+  Double.MinValue.toString)
+
+testAllApis(
+  Double.MinValue.cast(FLOAT_TYPE_INFO),
+  s"${Double.MinValue}.cast(FLOAT)",
+  s"CAST(${Double.MinValue} AS FLOAT)",
+  Float.NegativeInfinity.toString)
+
+testAllApis(
+  Byte.MinValue.cast(BYTE_TYPE_INFO),
+  s"(${Byte.MinValue}).cast(BYTE)",
+  s"CAST(${Byte.MinValue} AS TINYINT)",
+  Byte.MinValue.toString)
+
+testAllApis(
+  Byte.MinValue.cast(BYTE_TYPE_INFO) - 1.cast(BYTE_TYPE_INFO),
+  s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)",
+  s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)",
+  Byte.MaxValue.toString)
+
+testAllApis(
+  Short.MinValue.cast(SHORT_TYPE_INFO),
+  s"(${Short.MinValue}).cast(SHORT)",
+  s"CAST(${Short.MinValue} AS SMALLINT)",
+  Short.MinValue.toString)
+
+testAllApis(
+  Int.MinValue.cast(INT_TYPE_INFO) - 1,
+  s"(${Int.MinValue}).cast(INT) - 1",
+  s"CAST(${Int.MinValue} AS INT) - 1",
+  Int.MaxValue.toString)
+
+testAllApis(
+  Long.MinValue.cast(LONG_TYPE_INFO),
+  s"(${Long.MinValue}L).cast(LONG)",
+  s"CAST(${Long.MinValue} AS BIGINT)",
+  Long.MinValue.toString)
+  }
+
+  @Test
+  def testDecimalCasting(): Unit = {
+// from String
+testTableApi(
+  "123456789123456789123456789".cast(BIG_DEC_TYPE_INFO),
+  "'123456789123456789123456789'.cast(DECIMAL)",
+  

[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327544#comment-15327544
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66806299
  
--- Diff: docs/apis/cli.md ---
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
  -r,--running  Show only running programs and their 
JobIDs
  -s,--scheduledShow only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+ -yid   YARN application ID of Flink YARN 
session to
+   connect to. Must not be set if 
JobManager HA
+   is used. In this case, JobManager RPC
+   location is automatically retrieved from
+   Zookeeper.
--- End diff --

Same as for `list` and `info`. I agree this verbosity is not very nice. 
Looking into how I can make this look nicer.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66806299
  
--- Diff: docs/apis/cli.md ---
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
  -r,--running  Show only running programs and their 
JobIDs
  -s,--scheduledShow only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+ -yid   YARN application ID of Flink YARN 
session to
+   connect to. Must not be set if 
JobManager HA
+   is used. In this case, JobManager RPC
+   location is automatically retrieved from
+   Zookeeper.
--- End diff --

Same as for `list` and `info`. I agree this verbosity is not very nice. 
Looking into how I can make this look nicer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327539#comment-15327539
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805960
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() {
}
 
/**
-* @return -1 if unknown. The maximum number of available processing 
slots at the Flink cluster
-* connected to this client.
+* Gets the current JobManager address from the Flink configuration 
(may change in case of a HA setup).
+* @return The address (host and port) of the leading JobManager
 */
-   public int getMaxSlots() {
-   return this.maxSlots;
+   public InetSocketAddress getJobManagerAddressFromConfig() {
+   try {
+   String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+   return new InetSocketAddress(hostName, port);
--- End diff --

Fixed.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327537#comment-15327537
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2085
  
Thanks for the review! I've addressed almost all your comments and will 
ping you once the PR get updated. Some more comments: 

> (1) When listing the yarn-session.sh cli options, I could not see the 
newly added argument

Fixed. The argument was not printed. The code now prints all available 
options.

>Starting a YARN session works. Also, connecting with a second YARN session 
works. However, if I'm stopping the session from the second session client, the 
first one does not notice this.
It would be good if all connected clients receive the messages, instead of 
the last one connected.

Yes, would be nice to have that feature but I think it is out of the scope 
of this PR.

>Detached per job submission is not working (./bin/flink run -m 
yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar)

You're running a WordCount with interactive job submission detached. That 
doesn't work because the plan can't be extracted in interactive programs 
(programs which use count/collect/print).

>While trying out your code, I noticed that the .yarn-properties file is 
not properly deleted, even though my previous yarn session was shutting down 
correctly:

I'll look into it. I know that this is also an issue in the current master. 
I had the properties file lingering around many times. I think it is not 
properly cleaned up in the ShutdownHook.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327538#comment-15327538
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805953
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * Custom command-line interface to load hooks for the command-line 
interface.
+ */
+public interface CustomCommandLine {
+
+   /**
+* Returns a unique identifier for this custom command-line.
+* @return An unique identifier string
+*/
+   String getIdentifier();
+
+   /**
+* Adds custom options to the existing run options.
+* @param baseOptions The existing options.
+*/
+   void addRunOptions(Options baseOptions);
+
+   /**
+* Adds custom options to the existing general options.
+* @param baseOptions The existing options.
+*/
+   void addGeneralOptions(Options baseOptions);
+
+   /**
+* Retrieves a client for a running cluster
+* @param commandLine The command-line parameters from the CliFrontend
+* @param config The Flink config
+* @return Client if a cluster could be retrieve, null otherwise
--- End diff --

Thanks.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805960
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() {
}
 
/**
-* @return -1 if unknown. The maximum number of available processing 
slots at the Flink cluster
-* connected to this client.
+* Gets the current JobManager address from the Flink configuration 
(may change in case of a HA setup).
+* @return The address (host and port) of the leading JobManager
 */
-   public int getMaxSlots() {
-   return this.maxSlots;
+   public InetSocketAddress getJobManagerAddressFromConfig() {
+   try {
+   String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+   return new InetSocketAddress(hostName, port);
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805953
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * Custom command-line interface to load hooks for the command-line 
interface.
+ */
+public interface CustomCommandLine {
+
+   /**
+* Returns a unique identifier for this custom command-line.
+* @return An unique identifier string
+*/
+   String getIdentifier();
+
+   /**
+* Adds custom options to the existing run options.
+* @param baseOptions The existing options.
+*/
+   void addRunOptions(Options baseOptions);
+
+   /**
+* Adds custom options to the existing general options.
+* @param baseOptions The existing options.
+*/
+   void addGeneralOptions(Options baseOptions);
+
+   /**
+* Retrieves a client for a running cluster
+* @param commandLine The command-line parameters from the CliFrontend
+* @param config The Flink config
+* @return Client if a cluster could be retrieve, null otherwise
--- End diff --

Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327536#comment-15327536
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805706
  
--- Diff: docs/setup/yarn_setup.md ---
@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the 
YARN session using Flink.
 
 Use the YARN utilities (`yarn application -kill `) to stop the YARN 
session.
 
+ Attach to an existing Session
+
+Use the following command to start a session
+
+~~~bash
+./bin/yarn-session.sh
+~~~
+
+This command will show you the following overview:
+
+~~~bash
+Usage:
+   Required
+ -id,--applicationId  YARN application Id
--- End diff --

Yes, same as above. Fixed.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2085
  
Thanks for the review! I've addressed almost all your comments and will 
ping you once the PR get updated. Some more comments: 

> (1) When listing the yarn-session.sh cli options, I could not see the 
newly added argument

Fixed. The argument was not printed. The code now prints all available 
options.

>Starting a YARN session works. Also, connecting with a second YARN session 
works. However, if I'm stopping the session from the second session client, the 
first one does not notice this.
It would be good if all connected clients receive the messages, instead of 
the last one connected.

Yes, would be nice to have that feature but I think it is out of the scope 
of this PR.

>Detached per job submission is not working (./bin/flink run -m 
yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar)

You're running a WordCount with interactive job submission detached. That 
doesn't work because the plan can't be extracted in interactive programs 
(programs which use count/collect/print).

>While trying out your code, I noticed that the .yarn-properties file is 
not properly deleted, even though my previous yarn session was shutting down 
correctly:

I'll look into it. I know that this is also an issue in the current master. 
I had the properties file lingering around many times. I think it is not 
properly cleaned up in the ShutdownHook.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327535#comment-15327535
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66805682
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -122,14 +125,14 @@ object ScalarOperators {
   right: GeneratedExpression)
 : GeneratedExpression = {
 generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-  if (isString(left) && isString(right)) {
+  if (isReference(left) && isReference(right)) {
 (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) 
$operator 0"
   }
-  else if (isNumeric(left) && isNumeric(right)) {
+  else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
--- End diff --

If `left` or `right` is a `BigDecimal` invalid code is generated. 

> Caused by: org.codehaus.commons.compiler.CompileException: Line 64, 
Column 0: Cannot compare types "int" and "java.math.BigDecimal"

This happens if you replace the query in 
`FilterITCase.testFilterOnInteger()` by `SELECT * FROM MyTable WHERE a < 4.25`. 
`MyTable.a` is an `INTEGER` column and not auto-casted to `DECIMAL`. 
Auto-casting seems to work for expressions in the `SELECT` clause.


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805706
  
--- Diff: docs/setup/yarn_setup.md ---
@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the 
YARN session using Flink.
 
 Use the YARN utilities (`yarn application -kill `) to stop the YARN 
session.
 
+ Attach to an existing Session
+
+Use the following command to start a session
+
+~~~bash
+./bin/yarn-session.sh
+~~~
+
+This command will show you the following overview:
+
+~~~bash
+Usage:
+   Required
+ -id,--applicationId  YARN application Id
--- End diff --

Yes, same as above. Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...

2016-06-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66805682
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -122,14 +125,14 @@ object ScalarOperators {
   right: GeneratedExpression)
 : GeneratedExpression = {
 generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-  if (isString(left) && isString(right)) {
+  if (isReference(left) && isReference(right)) {
 (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) 
$operator 0"
   }
-  else if (isNumeric(left) && isNumeric(right)) {
+  else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
--- End diff --

If `left` or `right` is a `BigDecimal` invalid code is generated. 

> Caused by: org.codehaus.commons.compiler.CompileException: Line 64, 
Column 0: Cannot compare types "int" and "java.math.BigDecimal"

This happens if you replace the query in 
`FilterITCase.testFilterOnInteger()` by `SELECT * FROM MyTable WHERE a < 4.25`. 
`MyTable.a` is an `INTEGER` column and not auto-casted to `DECIMAL`. 
Auto-casting seems to work for expressions in the `SELECT` clause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327534#comment-15327534
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805499
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
+   }
+   return null;
--- End diff --

Same as above. Fixed.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327530#comment-15327530
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805417
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

Thank you, I'll look into this.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327533#comment-15327533
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805492
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
--- End diff --

Good idea.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327531#comment-15327531
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805455
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -415,62 +513,83 @@ public int run(String[] args) {
}
System.out.println(description);
return 0;
+   } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM
--- End diff --

Sorry, added this TODO while I fixing some of your first comments.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805492
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
--- End diff --

Good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327532#comment-15327532
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805479
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
+
+   while(true) {
--- End diff --

Yes, I wanted to bring up the Yarn session only once the cluster is ready. 
It is a semantic change but IMHO transparent to the user. There is not 
disadvantage from waiting until the cluster is ready. Ultimately, it would be 
nice to get rid of all waiting but we're not quite there yet.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327529#comment-15327529
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805382
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
--- End diff --

Thank you. I fixed that by using a non-static variable for the logger and 
dynamically retrieving the class name, i.e.
```java
private final Logger LOG = LoggerFactory.getLogger(getClass());
```


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805499
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
+   }
+   return null;
--- End diff --

Same as above. Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805479
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
+
+   while(true) {
--- End diff --

Yes, I wanted to bring up the Yarn session only once the cluster is ready. 
It is a semantic change but IMHO transparent to the user. There is not 
disadvantage from waiting until the cluster is ready. Ultimately, it would be 
nice to get rid of all waiting but we're not quite there yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805455
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -415,62 +513,83 @@ public int run(String[] args) {
}
System.out.println(description);
return 0;
+   } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM
--- End diff --

Sorry, added this TODO while I fixing some of your first comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805382
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
--- End diff --

Thank you. I fixed that by using a non-static variable for the logger and 
dynamically retrieving the class name, i.e.
```java
private final Logger LOG = LoggerFactory.getLogger(getClass());
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805417
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

Thank you, I'll look into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327524#comment-15327524
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66804777
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 ---
@@ -143,15 +143,30 @@ object ScalarFunctions {
 new MultiTypeMethodCallGen(BuiltInMethods.ABS))
 
   addSqlFunction(
+ABS,
+Seq(BIG_DEC_TYPE_INFO),
+new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC))
+
+  addSqlFunction(
--- End diff --

Just wondering, why are there no functions for `FLOAT`? Are they handled by 
the `DOUBLE` functions?


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...

2016-06-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66804777
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 ---
@@ -143,15 +143,30 @@ object ScalarFunctions {
 new MultiTypeMethodCallGen(BuiltInMethods.ABS))
 
   addSqlFunction(
+ABS,
+Seq(BIG_DEC_TYPE_INFO),
+new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC))
+
+  addSqlFunction(
--- End diff --

Just wondering, why are there no functions for `FLOAT`? Are they handled by 
the `DOUBLE` functions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327518#comment-15327518
 ] 

ASF GitHub Bot commented on FLINK-3859:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66804568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 ---
@@ -97,11 +97,26 @@ object CodeGenUtils {
 case _ => "null"
   }
 
-  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType 
match {
-case nti: NumericTypeInfo[_] => // ok
-case _ => throw new CodeGenException("Numeric expression type 
expected.")
+  def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo 
match {
+case _: FractionalTypeInfo[_] => "double"
+case _ => "long"
   }
 
+  // 
--
+
+  def requireNumeric(genExpr: GeneratedExpression) =
+if (!TypeCheckUtils.isNumeric(genExpr.resultType)) {
+  throw new CodeGenException("Numeric expression type expected, but 
was " +
+s"'${genExpr.resultType}'")
+}
+
+  def requireNumericOrComparable(genExpr: GeneratedExpression) =
--- End diff --

Rename to `requireComparable()`?


> Add BigDecimal/BigInteger support to Table API
> --
>
> Key: FLINK-3859
> URL: https://issues.apache.org/jira/browse/FLINK-3859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Since FLINK-3786 has been solved, we can now start integrating 
> BigDecimal/BigInteger into the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...

2016-06-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2088#discussion_r66804568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 ---
@@ -97,11 +97,26 @@ object CodeGenUtils {
 case _ => "null"
   }
 
-  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType 
match {
-case nti: NumericTypeInfo[_] => // ok
-case _ => throw new CodeGenException("Numeric expression type 
expected.")
+  def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo 
match {
+case _: FractionalTypeInfo[_] => "double"
+case _ => "long"
   }
 
+  // 
--
+
+  def requireNumeric(genExpr: GeneratedExpression) =
+if (!TypeCheckUtils.isNumeric(genExpr.resultType)) {
+  throw new CodeGenException("Numeric expression type expected, but 
was " +
+s"'${genExpr.resultType}'")
+}
+
+  def requireNumericOrComparable(genExpr: GeneratedExpression) =
--- End diff --

Rename to `requireComparable()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327501#comment-15327501
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2085
  
Thanks a lot for adding this feature. I looked through the code and tried 
it out on a cluster.
Once my concerns are addressed (either through changes or comments) we can 
merge the change.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2085
  
Thanks a lot for adding this feature. I looked through the code and tried 
it out on a cluster.
Once my concerns are addressed (either through changes or comments) we can 
merge the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327495#comment-15327495
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66801768
  
--- Diff: docs/setup/yarn_setup.md ---
@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the 
YARN session using Flink.
 
 Use the YARN utilities (`yarn application -kill `) to stop the YARN 
session.
 
+ Attach to an existing Session
+
+Use the following command to start a session
+
+~~~bash
+./bin/yarn-session.sh
+~~~
+
+This command will show you the following overview:
+
+~~~bash
+Usage:
+   Required
+ -id,--applicationId  YARN application Id
--- End diff --

I think this option is also not listed when running ./bin/yarn-session.sh.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66801768
  
--- Diff: docs/setup/yarn_setup.md ---
@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the 
YARN session using Flink.
 
 Use the YARN utilities (`yarn application -kill `) to stop the YARN 
session.
 
+ Attach to an existing Session
+
+Use the following command to start a session
+
+~~~bash
+./bin/yarn-session.sh
+~~~
+
+This command will show you the following overview:
+
+~~~bash
+Usage:
+   Required
+ -id,--applicationId  YARN application Id
--- End diff --

I think this option is also not listed when running ./bin/yarn-session.sh.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327491#comment-15327491
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2085
  
Detached per job submission is not working (`./bin/flink run -m 
yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar `)

```
2016-06-13 07:28:22,423 INFO  org.apache.flink.yarn.ApplicationClient   
- Trying to register at JobManager 
akka.tcp://flink@10.0.2.15:45521/user/jobmanager.
2016-06-13 07:28:22,829 INFO  org.apache.flink.yarn.ApplicationClient   
- Successfully registered at the ResourceManager using 
JobManager Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090]
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
All TaskManagers are connected
Cluster started
Using address /10.0.2.15:45521 to connect to JobManager.
JobManager web interface address 
http://quickstart.cloudera:8088/proxy/application_1447844011707_0039/
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.


 The program finished with the following exception:

Job was submitted in detached mode. Results of job execution, such as 
accumulators, runtime, job id etc. are not available. Please make sure your 
program doesn't call an eager execution function [collect, print, printToErr, 
count]. 

org.apache.flink.client.program.DetachedEnvironment$DetachedJobExecutionResult.getAccumulatorResult(DetachedEnvironment.java:103)
org.apache.flink.api.java.DataSet.collect(DataSet.java:412)
org.apache.flink.api.java.DataSet.print(DataSet.java:1605)

org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92)
2016-06-13 07:28:27,206 INFO  org.apache.flink.yarn.YarnClusterClient   
- Disconnecting YarnClusterClient from ApplicationMaster
2016-06-13 07:28:27,208 INFO  org.apache.flink.yarn.ApplicationClient   
- Stopped Application client.
2016-06-13 07:28:27,208 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager 
Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090].
2016-06-13 07:28:27,307 INFO  org.apache.flink.yarn.YarnClusterClient   
- Application application_1447844011707_0039 finished with 
state RUNNING and final state UNDEFINED at 0
[cloudera@quickstart build-target]
```

In the JobManager logs, there is no sign of a job being submitted.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2085
  
Detached per job submission is not working (`./bin/flink run -m 
yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar `)

```
2016-06-13 07:28:22,423 INFO  org.apache.flink.yarn.ApplicationClient   
- Trying to register at JobManager 
akka.tcp://flink@10.0.2.15:45521/user/jobmanager.
2016-06-13 07:28:22,829 INFO  org.apache.flink.yarn.ApplicationClient   
- Successfully registered at the ResourceManager using 
JobManager Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090]
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
TaskManager status (0/1)
All TaskManagers are connected
Cluster started
Using address /10.0.2.15:45521 to connect to JobManager.
JobManager web interface address 
http://quickstart.cloudera:8088/proxy/application_1447844011707_0039/
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.


 The program finished with the following exception:

Job was submitted in detached mode. Results of job execution, such as 
accumulators, runtime, job id etc. are not available. Please make sure your 
program doesn't call an eager execution function [collect, print, printToErr, 
count]. 

org.apache.flink.client.program.DetachedEnvironment$DetachedJobExecutionResult.getAccumulatorResult(DetachedEnvironment.java:103)
org.apache.flink.api.java.DataSet.collect(DataSet.java:412)
org.apache.flink.api.java.DataSet.print(DataSet.java:1605)

org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92)
2016-06-13 07:28:27,206 INFO  org.apache.flink.yarn.YarnClusterClient   
- Disconnecting YarnClusterClient from ApplicationMaster
2016-06-13 07:28:27,208 INFO  org.apache.flink.yarn.ApplicationClient   
- Stopped Application client.
2016-06-13 07:28:27,208 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager 
Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090].
2016-06-13 07:28:27,307 INFO  org.apache.flink.yarn.YarnClusterClient   
- Application application_1447844011707_0039 finished with 
state RUNNING and final state UNDEFINED at 0
[cloudera@quickstart build-target]
```

In the JobManager logs, there is no sign of a job being submitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327487#comment-15327487
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2085
  
While trying out your code, I noticed that the `.yarn-properties` file is 
not properly deleted, even though my previous yarn session was shutting down 
correctly:

```
2016-06-13 07:16:51,885 INFO  org.apache.flink.client.program.ClusterClient 
- TaskManager status (0/1)
TaskManager status (0/1)
2016-06-13 07:16:52,386 INFO  org.apache.flink.client.program.ClusterClient 
- TaskManager status (0/1)
TaskManager status (0/1)
2016-06-13 07:16:52,887 INFO  org.apache.flink.client.program.ClusterClient 
- All TaskManagers are connected
All TaskManagers are connected
2016-06-13 07:16:52,887 INFO  org.apache.flink.client.program.ClusterClient 
- Looking up JobManager
2016-06-13 07:16:52,912 INFO  org.apache.flink.client.program.ClusterClient 
- Looking up JobManager
Flink JobManager is now running on 10.0.2.15:51747
JobManager Web Interface: 
http://quickstart.cloudera:8088/proxy/application_1447844011707_0038/
Number of connected TaskManagers changed to 1. Slots available: 1
^[[A^C2016-06-13 07:25:35,370 INFO  org.apache.flink.yarn.YarnClusterClient 
  - Shutting down YarnClusterClient from the client 
shutdown hook
2016-06-13 07:25:35,372 INFO  org.apache.flink.yarn.YarnClusterClient   
- Sending shutdown request to the Application Master
2016-06-13 07:25:35,373 INFO  org.apache.flink.yarn.ApplicationClient   
- Sending StopCluster request to JobManager.
2016-06-13 07:25:35,429 INFO  org.apache.flink.yarn.ApplicationClient   
- Stopped Application client.
2016-06-13 07:25:35,431 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager 
Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764].
2016-06-13 07:25:35,469 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down 
remote daemon.
2016-06-13 07:25:35,469 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon 
shut down; proceeding with flushing remote transports.
2016-06-13 07:25:35,622 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut 
down.
2016-06-13 07:25:35,701 INFO  org.apache.flink.yarn.YarnClusterClient   
- Deleting files in 
hdfs://quickstart.cloudera:8020/user/cloudera/.flink/application_1447844011707_0038
2016-06-13 07:25:35,706 INFO  org.apache.flink.yarn.YarnClusterClient   
- Application application_1447844011707_0038 finished with 
state FINISHED and final state SUCCEEDED at 1465827935399
2016-06-13 07:25:36,567 INFO  org.apache.flink.yarn.YarnClusterClient   
- YARN Client is shutting down
(reverse-i-search)`yarn': ./bin/^Crn-session.sh -n 1
[cloudera@quickstart build-target]$ ./bin/flink run -y yarn-cluster -yd -yn 
1 ./examples/batch/WordCount.jar 
JAR file does not exist: -y

Use the help option (-h or --help) to get help on the command.
[cloudera@quickstart build-target]$ ./bin/flink run -m yarn-cluster -yd -yn 
1 ./examples/batch/WordCount.jar 
2016-06-13 07:26:11,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found YARN properties file /tmp/.yarn-properties-cloudera
2016-06-13 07:26:11,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found YARN properties file /tmp/.yarn-properties-cloudera
Found YARN properties file /tmp/.yarn-properties-cloudera
2016-06-13 07:26:11,082 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Using JobManager address from YARN properties 
quickstart.cloudera/10.0.2.15:51747
2016-06-13 07:26:11,082 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Using JobManager address from YARN properties 
quickstart.cloudera/10.0.2.15:51747
Using JobManager address from YARN properties 
quickstart.cloudera/10.0.2.15:51747
2016-06-13 07:26:11,270 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
2016-06-13 07:26:11,795 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Found application 'application_1447844011707_0038' with 
JobManager host name 'quickstart.cloudera' and port '51747' from Yarn 
properties file.
2016-06-13 07:26:11,842 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
  

[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2085
  
While trying out your code, I noticed that the `.yarn-properties` file is 
not properly deleted, even though my previous yarn session was shutting down 
correctly:

```
2016-06-13 07:16:51,885 INFO  org.apache.flink.client.program.ClusterClient 
- TaskManager status (0/1)
TaskManager status (0/1)
2016-06-13 07:16:52,386 INFO  org.apache.flink.client.program.ClusterClient 
- TaskManager status (0/1)
TaskManager status (0/1)
2016-06-13 07:16:52,887 INFO  org.apache.flink.client.program.ClusterClient 
- All TaskManagers are connected
All TaskManagers are connected
2016-06-13 07:16:52,887 INFO  org.apache.flink.client.program.ClusterClient 
- Looking up JobManager
2016-06-13 07:16:52,912 INFO  org.apache.flink.client.program.ClusterClient 
- Looking up JobManager
Flink JobManager is now running on 10.0.2.15:51747
JobManager Web Interface: 
http://quickstart.cloudera:8088/proxy/application_1447844011707_0038/
Number of connected TaskManagers changed to 1. Slots available: 1
^[[A^C2016-06-13 07:25:35,370 INFO  org.apache.flink.yarn.YarnClusterClient 
  - Shutting down YarnClusterClient from the client 
shutdown hook
2016-06-13 07:25:35,372 INFO  org.apache.flink.yarn.YarnClusterClient   
- Sending shutdown request to the Application Master
2016-06-13 07:25:35,373 INFO  org.apache.flink.yarn.ApplicationClient   
- Sending StopCluster request to JobManager.
2016-06-13 07:25:35,429 INFO  org.apache.flink.yarn.ApplicationClient   
- Stopped Application client.
2016-06-13 07:25:35,431 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager 
Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764].
2016-06-13 07:25:35,469 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down 
remote daemon.
2016-06-13 07:25:35,469 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon 
shut down; proceeding with flushing remote transports.
2016-06-13 07:25:35,622 INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut 
down.
2016-06-13 07:25:35,701 INFO  org.apache.flink.yarn.YarnClusterClient   
- Deleting files in 
hdfs://quickstart.cloudera:8020/user/cloudera/.flink/application_1447844011707_0038
2016-06-13 07:25:35,706 INFO  org.apache.flink.yarn.YarnClusterClient   
- Application application_1447844011707_0038 finished with 
state FINISHED and final state SUCCEEDED at 1465827935399
2016-06-13 07:25:36,567 INFO  org.apache.flink.yarn.YarnClusterClient   
- YARN Client is shutting down
(reverse-i-search)`yarn': ./bin/^Crn-session.sh -n 1
[cloudera@quickstart build-target]$ ./bin/flink run -y yarn-cluster -yd -yn 
1 ./examples/batch/WordCount.jar 
JAR file does not exist: -y

Use the help option (-h or --help) to get help on the command.
[cloudera@quickstart build-target]$ ./bin/flink run -m yarn-cluster -yd -yn 
1 ./examples/batch/WordCount.jar 
2016-06-13 07:26:11,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found YARN properties file /tmp/.yarn-properties-cloudera
2016-06-13 07:26:11,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found YARN properties file /tmp/.yarn-properties-cloudera
Found YARN properties file /tmp/.yarn-properties-cloudera
2016-06-13 07:26:11,082 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Using JobManager address from YARN properties 
quickstart.cloudera/10.0.2.15:51747
2016-06-13 07:26:11,082 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Using JobManager address from YARN properties 
quickstart.cloudera/10.0.2.15:51747
Using JobManager address from YARN properties 
quickstart.cloudera/10.0.2.15:51747
2016-06-13 07:26:11,270 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
2016-06-13 07:26:11,795 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Found application 'application_1447844011707_0038' with 
JobManager host name 'quickstart.cloudera' and port '51747' from Yarn 
properties file.
2016-06-13 07:26:11,842 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
2016-06-13 07:26:11,878 ERROR org.apache.flink.yarn.YarnClusterDescriptor   
- The application application_1447844011707_0038 doesn't run 
anymore. It has previously completed with final status: SUCCEEDED


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66800124
  
--- Diff: docs/apis/cli.md ---
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
  -r,--running  Show only running programs and their 
JobIDs
  -s,--scheduledShow only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+ -yid   YARN application ID of Flink YARN 
session to
+   connect to. Must not be set if 
JobManager HA
+   is used. In this case, JobManager RPC
+   location is automatically retrieved from
+   Zookeeper.
--- End diff --

I tried this, but I wonder why its not logging that the job has been 
cancelled. 
It logs that "All TaskManagers are connected", I don't think this message 
is relevant when cancelling a job.

```
[cloudera@quickstart build-target]$  ./bin/flink cancel -m yarn-cluster 
-yid application_1447844011707_0038 b9b8f76616073d09c596545a3cda978f
2016-06-13 07:22:21,355 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
2016-06-13 07:22:22,277 INFO  org.apache.flink.yarn.YarnClusterClient   
- Start application client.
2016-06-13 07:22:22,288 INFO  org.apache.flink.yarn.ApplicationClient   
- Notification about new leader address 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null.
2016-06-13 07:22:22,290 INFO  org.apache.flink.yarn.ApplicationClient   
- Received address of new leader 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null.
2016-06-13 07:22:22,290 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager null.
Waiting until all TaskManagers have connected
2016-06-13 07:22:22,297 INFO  org.apache.flink.yarn.ApplicationClient   
- Trying to register at JobManager 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...
2016-06-13 07:22:22,542 INFO  org.apache.flink.yarn.ApplicationClient   
- Successfully registered at the ResourceManager using 
JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]
All TaskManagers are connected
2016-06-13 07:22:22,945 INFO  org.apache.flink.yarn.YarnClusterClient   
- Shutting down YarnClusterClient from the client shutdown hook
2016-06-13 07:22:22,945 INFO  org.apache.flink.yarn.YarnClusterClient   
- Disconnecting YarnClusterClient from ApplicationMaster
2016-06-13 07:22:22,947 INFO  org.apache.flink.yarn.ApplicationClient   
- Stopped Application client.
2016-06-13 07:22:22,947 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager 
Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764].
2016-06-13 07:22:23,056 INFO  org.apache.flink.yarn.YarnClusterClient   
- Application application_1447844011707_0038 finished with 
state RUNNING and final state UNDEFINED at 0
[cloudera@quickstart build-target]$ 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327477#comment-15327477
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66800124
  
--- Diff: docs/apis/cli.md ---
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
  -r,--running  Show only running programs and their 
JobIDs
  -s,--scheduledShow only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+ -yid   YARN application ID of Flink YARN 
session to
+   connect to. Must not be set if 
JobManager HA
+   is used. In this case, JobManager RPC
+   location is automatically retrieved from
+   Zookeeper.
--- End diff --

I tried this, but I wonder why its not logging that the job has been 
cancelled. 
It logs that "All TaskManagers are connected", I don't think this message 
is relevant when cancelling a job.

```
[cloudera@quickstart build-target]$  ./bin/flink cancel -m yarn-cluster 
-yid application_1447844011707_0038 b9b8f76616073d09c596545a3cda978f
2016-06-13 07:22:21,355 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
2016-06-13 07:22:22,277 INFO  org.apache.flink.yarn.YarnClusterClient   
- Start application client.
2016-06-13 07:22:22,288 INFO  org.apache.flink.yarn.ApplicationClient   
- Notification about new leader address 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null.
2016-06-13 07:22:22,290 INFO  org.apache.flink.yarn.ApplicationClient   
- Received address of new leader 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null.
2016-06-13 07:22:22,290 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager null.
Waiting until all TaskManagers have connected
2016-06-13 07:22:22,297 INFO  org.apache.flink.yarn.ApplicationClient   
- Trying to register at JobManager 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...
2016-06-13 07:22:22,542 INFO  org.apache.flink.yarn.ApplicationClient   
- Successfully registered at the ResourceManager using 
JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]
All TaskManagers are connected
2016-06-13 07:22:22,945 INFO  org.apache.flink.yarn.YarnClusterClient   
- Shutting down YarnClusterClient from the client shutdown hook
2016-06-13 07:22:22,945 INFO  org.apache.flink.yarn.YarnClusterClient   
- Disconnecting YarnClusterClient from ApplicationMaster
2016-06-13 07:22:22,947 INFO  org.apache.flink.yarn.ApplicationClient   
- Stopped Application client.
2016-06-13 07:22:22,947 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager 
Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764].
2016-06-13 07:22:23,056 INFO  org.apache.flink.yarn.YarnClusterClient   
- Application application_1447844011707_0038 finished with 
state RUNNING and final state UNDEFINED at 0
[cloudera@quickstart build-target]$ 
```


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-06-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327463#comment-15327463
 ] 

ASF GitHub Bot commented on FLINK-3937:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66799100
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

Are there any tests for this functionality?


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Maximilian Michels
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66799100
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

Are there any tests for this functionality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >