[GitHub] flink issue #2081: [FLINK-4020][streaming-connectors] Move shard list queryi...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2081 Hi @rmetzger, Thanks for letting me know. However, I'd like to close this PR for now for the following reasons: 1. The new shard-to-subtask assignment logic introduced with this change will actually need to be moved again to run() as part of implementing Kinesis reshard handling [FLINK-3231](https://issues.apache.org/jira/browse/FLINK-3231). 2. I've testing this change a bit more on Kinesis streams with high shard counts, and it seems like the implementation needs more guarantee on that all subtasks will be able to get the shard list without failing with Amazon's LimitExceededException even after 3 retries. Since the implementation for FLINK-3231 will have a separate thread that polls for changes in the shard list, I'd like to strengthen this guarantee as part of FLINK-3231's PR. I'm almost done with FLINK-3231, and will reopen a PR to resolve FLINK-3231 and FLINK-4020 together. I'll keep you updated! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor
[ https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328943#comment-15328943 ] ASF GitHub Bot commented on FLINK-4020: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2081 Hi @rmetzger, Thanks for letting me know. However, I'd like to close this PR for now for the following reasons: 1. The new shard-to-subtask assignment logic introduced with this change will actually need to be moved again to run() as part of implementing Kinesis reshard handling [FLINK-3231](https://issues.apache.org/jira/browse/FLINK-3231). 2. I've testing this change a bit more on Kinesis streams with high shard counts, and it seems like the implementation needs more guarantee on that all subtasks will be able to get the shard list without failing with Amazon's LimitExceededException even after 3 retries. Since the implementation for FLINK-3231 will have a separate thread that polls for changes in the shard list, I'd like to strengthen this guarantee as part of FLINK-3231's PR. I'm almost done with FLINK-3231, and will reopen a PR to resolve FLINK-3231 and FLINK-4020 together. I'll keep you updated! > Remove shard list querying from Kinesis consumer constructor > > > Key: FLINK-4020 > URL: https://issues.apache.org/jira/browse/FLINK-4020 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Currently FlinkKinesisConsumer is querying for the whole list of shards in > the constructor, forcing the client to be able to access Kinesis as well. > This is also a drawback for handling Kinesis-side resharding, since we'd want > all shard listing / shard-to-task assigning / shard end (result of > resharding) handling logic to be capable of being independently done within > task life cycle methods, with defined and definite results. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3857) Add reconnect attempt to Elasticsearch host
[ https://issues.apache.org/jira/browse/FLINK-3857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328918#comment-15328918 ] ASF GitHub Bot commented on FLINK-3857: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1962 Hi @sbcd90, Sorry for the late reply, as I'm currently busy some other things. I'll be happy to help review again within the next 2~3 days. > Add reconnect attempt to Elasticsearch host > --- > > Key: FLINK-3857 > URL: https://issues.apache.org/jira/browse/FLINK-3857 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.0.2 >Reporter: Fabian Hueske >Assignee: Subhobrata Dey > > Currently, the connection to the Elasticsearch host is opened in > {{ElasticsearchSink.open()}}. In case the connection is lost (maybe due to a > changed DNS entry), the sink fails. > I propose to catch the Exception for lost connections in the {{invoke()}} > method and try to re-open the connection for a configurable number of times > with a certain delay. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1962: [FLINK-3857][Streaming Connectors]Add reconnect attempt t...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1962 Hi @sbcd90, Sorry for the late reply, as I'm currently busy some other things. I'll be happy to help review again within the next 2~3 days. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4069) Kafka Consumer should not initialize on construction
[ https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4069 at 6/14/16 2:50 AM: - Thanks for creating this JIRA Shannon. However, there's already a previously opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. Would you be ok with tracking this issue on FLINK-4023 and close this as a duplicate issue? I've referenced a link to this JIRA on FLINK-4023. was (Author: tzulitai): Thanks for creating this JIRA Shannon. However, there's already a previously opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. Let's track this issue on FLINK-4023 and close this as a duplicate issue :) I've referenced a link to this JIRA on FLINK-4023. > Kafka Consumer should not initialize on construction > > > Key: FLINK-4069 > URL: https://issues.apache.org/jira/browse/FLINK-4069 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Shannon Carey > > The Kafka Consumer connector currently interacts over the network with Kafka > in order to get partition metadata when the class is constructed. Instead, it > should do that work when the job actually begins to run (for example, in > AbstractRichFunction#open() of FlinkKafkaConsumer0?). > The main weakness of broker querying in the constructor is that if there are > network problems, Flink might take a long time (eg. ~1hr) inside the > user-supplied main() method while it attempts to contact each broker and > perform retries. In general, setting up the Kafka partitions does not seem > strictly necessary as part of execution of main() in order to set up the job > plan/topology. > However, as Robert Metzger mentions, there are important concerns with how > Kafka partitions are handled: > "The main reason why we do the querying centrally is: > a) avoid overloading the brokers > b) send the same list of partitions (in the same order) to all parallel > consumers to do a fixed partition assignments (also across restarts). When we > do the querying in the open() method, we need to make sure that all > partitions are assigned, without duplicates (also after restarts in case of > failures)." > See also the mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4069) Kafka Consumer should not initialize on construction
[ https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4069 at 6/14/16 2:50 AM: - Thanks for creating this JIRA Shannon. However, there's already a previously opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. Let's track this issue on FLINK-4023 and close this as a duplicate issue :) I've referenced a link to this JIRA on FLINK-4023. was (Author: tzulitai): Thanks for creating this JIRA Shannon. However, there's already a previously opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. Let's track this issue on FLINK-4023 on close this as a duplicate issue :) I've referenced a link to this JIRA on FLINK-4023. > Kafka Consumer should not initialize on construction > > > Key: FLINK-4069 > URL: https://issues.apache.org/jira/browse/FLINK-4069 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Shannon Carey > > The Kafka Consumer connector currently interacts over the network with Kafka > in order to get partition metadata when the class is constructed. Instead, it > should do that work when the job actually begins to run (for example, in > AbstractRichFunction#open() of FlinkKafkaConsumer0?). > The main weakness of broker querying in the constructor is that if there are > network problems, Flink might take a long time (eg. ~1hr) inside the > user-supplied main() method while it attempts to contact each broker and > perform retries. In general, setting up the Kafka partitions does not seem > strictly necessary as part of execution of main() in order to set up the job > plan/topology. > However, as Robert Metzger mentions, there are important concerns with how > Kafka partitions are handled: > "The main reason why we do the querying centrally is: > a) avoid overloading the brokers > b) send the same list of partitions (in the same order) to all parallel > consumers to do a fixed partition assignments (also across restarts). When we > do the querying in the open() method, we need to make sure that all > partitions are assigned, without duplicates (also after restarts in case of > failures)." > See also the mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4069) Kafka Consumer should not initialize on construction
[ https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840 ] Tzu-Li (Gordon) Tai commented on FLINK-4069: Thanks for creating this JIRA Shannon. However, there's already a previously opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. Let's track this issue on FLINK-4023 on close this as a duplicate issue :) I've referenced a link to this JIRA on FLINK-4023. > Kafka Consumer should not initialize on construction > > > Key: FLINK-4069 > URL: https://issues.apache.org/jira/browse/FLINK-4069 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Shannon Carey > > The Kafka Consumer connector currently interacts over the network with Kafka > in order to get partition metadata when the class is constructed. Instead, it > should do that work when the job actually begins to run (for example, in > AbstractRichFunction#open() of FlinkKafkaConsumer0?). > The main weakness of broker querying in the constructor is that if there are > network problems, Flink might take a long time (eg. ~1hr) inside the > user-supplied main() method while it attempts to contact each broker and > perform retries. In general, setting up the Kafka partitions does not seem > strictly necessary as part of execution of main() in order to set up the job > plan/topology. > However, as Robert Metzger mentions, there are important concerns with how > Kafka partitions are handled: > "The main reason why we do the querying centrally is: > a) avoid overloading the brokers > b) send the same list of partitions (in the same order) to all parallel > consumers to do a fixed partition assignments (also across restarts). When we > do the querying in the open() method, we need to make sure that all > partitions are assigned, without duplicates (also after restarts in case of > failures)." > See also the mailing list discussion: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4023) Move Kafka consumer partition discovery from constructor to open()
[ https://issues.apache.org/jira/browse/FLINK-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328831#comment-15328831 ] Tzu-Li (Gordon) Tai commented on FLINK-4023: https://issues.apache.org/jira/browse/FLINK-4069 The above is a duplicate issue for this JIRA opened after this one. I'm referencing the link here since the description in FLINK-4069 covers additional information on the problem, as well as link to a related discussion thread on mailing list. > Move Kafka consumer partition discovery from constructor to open() > -- > > Key: FLINK-4023 > URL: https://issues.apache.org/jira/browse/FLINK-4023 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Michal Harish >Priority: Minor > Labels: kafka, kafka-0.8 > > Currently, Flink queries Kafka for partition information when creating the > Kafka consumer. This is done on the client when submitting the Flink job, > which requires the client to be able to fetch the partition data from Kafka > which may only be accessible from the cluster environment where the tasks > will be running. Moving the partition discovery to the open() method should > solve this problem. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug
[ https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328758#comment-15328758 ] dengchangfu commented on FLINK-4061: In china. flink can be integrated with traditional relational database is very important.If flink can do,it will quickly make up for gaps in the market in batch and streaming. I've been using for almost two years Spark.But it does not really meet the real-time. so i want to try flink.Thanks > about flink jdbc connect oracle db exists a crital bug > --- > > Key: FLINK-4061 > URL: https://issues.apache.org/jira/browse/FLINK-4061 > Project: Flink > Issue Type: Bug > Components: Batch >Affects Versions: 1.1.0 > Environment: ubuntu ,jdk1.8.0 ,Start a Local Flink Cluster >Reporter: dengchangfu >Priority: Blocker > Original Estimate: 168h > Remaining Estimate: 168h > > I use flink-jdbc to connect oracle db for etl, so i write a demo to test the > feature. the code is simple,but after I submit this app ,a exception happen. > exception info like this: > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > my code like this: > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; > import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; > import org.apache.flink.api.table.Row; > import org.apache.flink.api.table.typeutils.RowTypeInfo; > import > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; > import java.sql.ResultSet; > import java.sql.Types; > /** > * Skeleton for a Flink Job. > * > * For a full example of a Flink Job, see the WordCountJob.java file in the > * same package/directory or have a look at the website. > * > * You can also generate a .jar file that you can submit on your Flink > * cluster. > * Just type > *mvn clean package > * in the projects root directory. > * You will find the jar in > *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar > * > */ > public class Job { > public static final TypeInformation[] fieldTypes = new > TypeInformation[]{ > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.FLOAT_TYPE_INFO > }; > public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); > public static void main(String[] args) { > // set up the execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > JDBCInputFormatBuilder inputBuilder = > JDBCInputFormat.buildJDBCInputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("select CLIENT_ID,OCCUR_BALANCE from > HS_ASSET.FUNDJOUR@OTC") > .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) > .setRowTypeInfo(rowTypeInfo); > DataSet source = env.createInput(inputBuilder.finish()); > source.output(JDBCOutputFormat.buildJDBCOutputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("insert into dengabc (client_id,salary) > values(?,?)") > .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE}) > .finish()); > //source.print(); > //source.first(20).print(); > //dbData.print(); > /** > * Here, you can start creating your execution plan for Flink. > * > * Start with getting some data from the environment, like > *env.readTextFile(textPath); > * > * then, transform the resulting DataSet using operations > * like > *.filter() > *.flatMap() > *.join() > *.coGroup() > * and many more. > * Have a look at the programming guide for the Java API: > * > * http://flink.apache.org/docs/latest/apis/batch/index.html > * > * and the examples > * > *
[jira] [Commented] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug
[ https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328749#comment-15328749 ] dengchangfu commented on FLINK-4061: I also think this is the problem,the statement was not inited.but if I want to use the flink-jdbc to connect oracle,how to do? I have try to use 1.0.3(the latest),there are some other exception,so I clone the source code to compile,then use the jar(1.1 snapshot) > about flink jdbc connect oracle db exists a crital bug > --- > > Key: FLINK-4061 > URL: https://issues.apache.org/jira/browse/FLINK-4061 > Project: Flink > Issue Type: Bug > Components: Batch >Affects Versions: 1.1.0 > Environment: ubuntu ,jdk1.8.0 ,Start a Local Flink Cluster >Reporter: dengchangfu >Priority: Blocker > Original Estimate: 168h > Remaining Estimate: 168h > > I use flink-jdbc to connect oracle db for etl, so i write a demo to test the > feature. the code is simple,but after I submit this app ,a exception happen. > exception info like this: > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > my code like this: > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; > import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; > import org.apache.flink.api.table.Row; > import org.apache.flink.api.table.typeutils.RowTypeInfo; > import > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; > import java.sql.ResultSet; > import java.sql.Types; > /** > * Skeleton for a Flink Job. > * > * For a full example of a Flink Job, see the WordCountJob.java file in the > * same package/directory or have a look at the website. > * > * You can also generate a .jar file that you can submit on your Flink > * cluster. > * Just type > *mvn clean package > * in the projects root directory. > * You will find the jar in > *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar > * > */ > public class Job { > public static final TypeInformation[] fieldTypes = new > TypeInformation[]{ > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.FLOAT_TYPE_INFO > }; > public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); > public static void main(String[] args) { > // set up the execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > JDBCInputFormatBuilder inputBuilder = > JDBCInputFormat.buildJDBCInputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("select CLIENT_ID,OCCUR_BALANCE from > HS_ASSET.FUNDJOUR@OTC") > .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) > .setRowTypeInfo(rowTypeInfo); > DataSet source = env.createInput(inputBuilder.finish()); > source.output(JDBCOutputFormat.buildJDBCOutputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("insert into dengabc (client_id,salary) > values(?,?)") > .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE}) > .finish()); > //source.print(); > //source.first(20).print(); > //dbData.print(); > /** > * Here, you can start creating your execution plan for Flink. > * > * Start with getting some data from the environment, like > *env.readTextFile(textPath); > * > * then, transform the resulting DataSet using operations > * like > *.filter() > *.flatMap() > *.join() > *.coGroup() > * and many more. > * Have a look at the programming guide for the Java API: > * > * http://flink.apache.org/docs/latest/apis/batch/index.html > * > * and the examples > * > * http://flink.apache.org/docs/latest/apis/batch/examples.html >
[jira] [Commented] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug
[ https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328742#comment-15328742 ] dengchangfu commented on FLINK-4061: I have depend oracle jar in pom,but it is really a problem.User-friendly is really important for open source,thank you reply > about flink jdbc connect oracle db exists a crital bug > --- > > Key: FLINK-4061 > URL: https://issues.apache.org/jira/browse/FLINK-4061 > Project: Flink > Issue Type: Bug > Components: Batch >Affects Versions: 1.1.0 > Environment: ubuntu ,jdk1.8.0 ,Start a Local Flink Cluster >Reporter: dengchangfu >Priority: Blocker > Original Estimate: 168h > Remaining Estimate: 168h > > I use flink-jdbc to connect oracle db for etl, so i write a demo to test the > feature. the code is simple,but after I submit this app ,a exception happen. > exception info like this: > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > my code like this: > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; > import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; > import org.apache.flink.api.table.Row; > import org.apache.flink.api.table.typeutils.RowTypeInfo; > import > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; > import java.sql.ResultSet; > import java.sql.Types; > /** > * Skeleton for a Flink Job. > * > * For a full example of a Flink Job, see the WordCountJob.java file in the > * same package/directory or have a look at the website. > * > * You can also generate a .jar file that you can submit on your Flink > * cluster. > * Just type > *mvn clean package > * in the projects root directory. > * You will find the jar in > *target/flink-quickstart-0.1-SNAPSHOT-Sample.jar > * > */ > public class Job { > public static final TypeInformation[] fieldTypes = new > TypeInformation[]{ > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.FLOAT_TYPE_INFO > }; > public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); > public static void main(String[] args) { > // set up the execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > JDBCInputFormatBuilder inputBuilder = > JDBCInputFormat.buildJDBCInputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("select CLIENT_ID,OCCUR_BALANCE from > HS_ASSET.FUNDJOUR@OTC") > .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) > .setRowTypeInfo(rowTypeInfo); > DataSet source = env.createInput(inputBuilder.finish()); > source.output(JDBCOutputFormat.buildJDBCOutputFormat() > .setDrivername("oracle.jdbc.driver.OracleDriver") > .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest") > .setUsername("crmii") > .setPassword("crmii") > .setQuery("insert into dengabc (client_id,salary) > values(?,?)") > .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE}) > .finish()); > //source.print(); > //source.first(20).print(); > //dbData.print(); > /** > * Here, you can start creating your execution plan for Flink. > * > * Start with getting some data from the environment, like > *env.readTextFile(textPath); > * > * then, transform the resulting DataSet using operations > * like > *.filter() > *.flatMap() > *.join() > *.coGroup() > * and many more. > * Have a look at the programming guide for the Java API: > * > * http://flink.apache.org/docs/latest/apis/batch/index.html > * > * and the examples > * > * http://flink.apache.org/docs/latest/apis/batch/examples.html > * > */ > // execute program > try { > env.execute("Flink Java API Skeleton"); > } catch
[jira] [Commented] (FLINK-3971) Aggregates handle null values incorrectly.
[ https://issues.apache.org/jira/browse/FLINK-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328667#comment-15328667 ] ASF GitHub Bot commented on FLINK-3971: --- Github user gallenvara commented on the issue: https://github.com/apache/flink/pull/2049 @fhueske Done. > Aggregates handle null values incorrectly. > -- > > Key: FLINK-3971 > URL: https://issues.apache.org/jira/browse/FLINK-3971 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: GaoLun >Priority: Critical > Fix For: 1.1.0 > > > Table API and SQL aggregates are supposed to ignore null values, e.g., > {{sum(1,2,null,4)}} is supposed to return {{7}}. > There current implementation is correct if at least one valid value is > present however, is incorrect if only null values are aggregated. {{sum(null, > null, null)}} should return {{null}} instead of {{0}} > Currently only the Count aggregate handles the case of null-values-only > correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...
Github user gallenvara commented on the issue: https://github.com/apache/flink/pull/2049 @fhueske Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328612#comment-15328612 ] ASF GitHub Bot commented on FLINK-3921: --- Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2060 Sorry for delay, got busy. Thank you @StephanEwen for your review.updated.Please have a look.thanks cc @greghogan > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding
Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2060 Sorry for delay, got busy. Thank you @StephanEwen for your review.updated.Please have a look.thanks cc @greghogan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4069) Kafka Consumer should not initialize on construction
Shannon Carey created FLINK-4069: Summary: Kafka Consumer should not initialize on construction Key: FLINK-4069 URL: https://issues.apache.org/jira/browse/FLINK-4069 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.0.3 Reporter: Shannon Carey The Kafka Consumer connector currently interacts over the network with Kafka in order to get partition metadata when the class is constructed. Instead, it should do that work when the job actually begins to run (for example, in AbstractRichFunction#open() of FlinkKafkaConsumer0?). The main weakness of broker querying in the constructor is that if there are network problems, Flink might take a long time (eg. ~1hr) inside the user-supplied main() method while it attempts to contact each broker and perform retries. In general, setting up the Kafka partitions does not seem strictly necessary as part of execution of main() in order to set up the job plan/topology. However, as Robert Metzger mentions, there are important concerns with how Kafka partitions are handled: "The main reason why we do the querying centrally is: a) avoid overloading the brokers b) send the same list of partitions (in the same order) to all parallel consumers to do a fixed partition assignments (also across restarts). When we do the querying in the open() method, we need to make sure that all partitions are assigned, without duplicates (also after restarts in case of failures)." See also the mailing list discussion: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3340) Fix object juggling in drivers
[ https://issues.apache.org/jira/browse/FLINK-3340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328014#comment-15328014 ] ASF GitHub Bot commented on FLINK-3340: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/1626 Thank you @xhumanoid for reporting this! I will make the fix. > Fix object juggling in drivers > -- > > Key: FLINK-3340 > URL: https://issues.apache.org/jira/browse/FLINK-3340 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.0.0 > > > {{ReduceDriver}}, {{ReduceCombineDriver}}, and {{ChainedAllReduceDriver}} are > not properly tracking objects for reuse. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1626: [FLINK-3340] [runtime] Fix object juggling in drivers
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/1626 Thank you @xhumanoid for reporting this! I will make the fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3323) Add documentation for NiFi connector
[ https://issues.apache.org/jira/browse/FLINK-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327932#comment-15327932 ] ASF GitHub Bot commented on FLINK-3323: --- GitHub user smarthi opened a pull request: https://github.com/apache/flink/pull/2099 Flink 3323: Add documentation for NiFi connector Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-3323] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/flink FLINK-3323 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2099.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2099 commit f9a2e54cba178f1931528fb31638c0e72705b7b4 Author: smarthiDate: 2016-05-30T13:04:25Z Initial Commit of nifi docs commit db1d8628f103bc7d319db09a151bb555e48694f9 Author: smarthi Date: 2016-05-31T11:24:11Z Initial Commit of nifi docs commit 878c74254abac1cb62d1ebef822b8c4f31cd48a2 Author: smarthi Date: 2016-06-13T18:43:43Z FLINK-3323: Add documentation for NiFi connector > Add documentation for NiFi connector > > > Key: FLINK-3323 > URL: https://issues.apache.org/jira/browse/FLINK-3323 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Matthias J. Sax >Assignee: Suneel Marthi >Priority: Minor > > Add Nifi-connector documentation to the "Data Stream / Connectors" web page > docs. See also FLINK-3324 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3323) Add documentation for NiFi connector
[ https://issues.apache.org/jira/browse/FLINK-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Suneel Marthi updated FLINK-3323: - Summary: Add documentation for NiFi connector (was: Nifi connector not documented) > Add documentation for NiFi connector > > > Key: FLINK-3323 > URL: https://issues.apache.org/jira/browse/FLINK-3323 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Matthias J. Sax >Assignee: Suneel Marthi >Priority: Minor > > Add Nifi-connector documentation to the "Data Stream / Connectors" web page > docs. See also FLINK-3324 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4066) RabbitMQ source, customize queue arguments
[ https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kanstantsin Kamkou closed FLINK-4066. - > RabbitMQ source, customize queue arguments > -- > > Key: FLINK-4066 > URL: https://issues.apache.org/jira/browse/FLINK-4066 > Project: Flink > Issue Type: Improvement >Reporter: Kanstantsin Kamkou >Priority: Minor > Labels: queue, rabbitmq > Fix For: 1.1.0 > > > Please, add a functionality to customize the line (custom attributes for a > queue). > {code}channel.queueDeclare(queueName, false, false, false, null);{code} > Thank you. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal
[ https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327788#comment-15327788 ] Robert Metzger commented on FLINK-4051: --- Great, Thank you! > RabbitMQ Source might not react to cancel signal > > > Key: FLINK-4051 > URL: https://issues.apache.org/jira/browse/FLINK-4051 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Subhankar Biswas > > As reported here > https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517, > the RabbitMQ source might block forever / ignore the cancelling signal, if > its listening to an empty queue. > Fix: call nextDelivery() with a timeout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4045) Test AMQP connector using Apache ActiveMQ emebdded broker
[ https://issues.apache.org/jira/browse/FLINK-4045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Subhankar Biswas reassigned FLINK-4045: --- Assignee: Subhankar Biswas > Test AMQP connector using Apache ActiveMQ emebdded broker > - > > Key: FLINK-4045 > URL: https://issues.apache.org/jira/browse/FLINK-4045 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Subhankar Biswas > > Currently, our (RabbitMQ) AMQP client is not tested in any integration tests. > Apache ActiveMQ implements an AMQP broker and they provide an embedded > implementation we can use for the integration tests: > http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal
[ https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327758#comment-15327758 ] Subhankar Biswas commented on FLINK-4051: - OK i'll start working on it. > RabbitMQ Source might not react to cancel signal > > > Key: FLINK-4051 > URL: https://issues.apache.org/jira/browse/FLINK-4051 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Subhankar Biswas > > As reported here > https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517, > the RabbitMQ source might block forever / ignore the cancelling signal, if > its listening to an empty queue. > Fix: call nextDelivery() with a timeout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins
[ https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327693#comment-15327693 ] ASF GitHub Bot commented on FLINK-3944: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/2098 [revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder Cartesian products and joins." This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853. Added rewrite rules blow up search space which cannot be effectively pruned without cardinality estimates. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableCrossRevert Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2098 commit df438a9fe5e9fcd191c371a888056c49aa246992 Author: Fabian HueskeDate: 2016-06-13T16:18:42Z [revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder Cartesian products and joins." This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853. Added rewrite rules blow up search space which cannot be effectively pruned without cardinality estimates. > Add optimization rules to reorder Cartesian products and joins > -- > > Key: FLINK-3944 > URL: https://issues.apache.org/jira/browse/FLINK-3944 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.1.0 > > > Currently, we do not support the execution of Cartesian products. > Because we do not optimize the order of joins (due to missing statistics), > joins are executed in the order in which they are specified. This works well > for the Table API, however it can be problematic in case of SQL queries where > the order of tables in the FROM clause should not matter. > In case of SQL queries, it can happen that the optimized plan contains > Cartesian products because joins are not reordered. If we add optimization > rules that switch Cartesian products and joins, such situations can be > resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2098: [revert] [FLINK-3944] [tableAPI] Reverts "Add rewr...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/2098 [revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder Cartesian products and joins." This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853. Added rewrite rules blow up search space which cannot be effectively pruned without cardinality estimates. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableCrossRevert Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2098 commit df438a9fe5e9fcd191c371a888056c49aa246992 Author: Fabian HueskeDate: 2016-06-13T16:18:42Z [revert] [FLINK-3944] [tableAPI] Reverts "Add rewrite rules to reorder Cartesian products and joins." This reverts commit 85793a25a78ba5be96fabc2a26569318c6b53853. Added rewrite rules blow up search space which cannot be effectively pruned without cardinality estimates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3971) Aggregates handle null values incorrectly.
[ https://issues.apache.org/jira/browse/FLINK-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327689#comment-15327689 ] ASF GitHub Bot commented on FLINK-3971: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2049 Hi @gallenvara, you have a merge commit in your changes. Can you rebase your changes to the current master and squash the commits? Thanks! > Aggregates handle null values incorrectly. > -- > > Key: FLINK-3971 > URL: https://issues.apache.org/jira/browse/FLINK-3971 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: GaoLun >Priority: Critical > Fix For: 1.1.0 > > > Table API and SQL aggregates are supposed to ignore null values, e.g., > {{sum(1,2,null,4)}} is supposed to return {{7}}. > There current implementation is correct if at least one valid value is > present however, is incorrect if only null values are aggregated. {{sum(null, > null, null)}} should return {{null}} instead of {{0}} > Currently only the Count aggregate handles the case of null-values-only > correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2049 Hi @gallenvara, you have a merge commit in your changes. Can you rebase your changes to the current master and squash the commits? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins
[ https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327687#comment-15327687 ] Fabian Hueske commented on FLINK-3944: -- I will open a PR to revert the commit. > Add optimization rules to reorder Cartesian products and joins > -- > > Key: FLINK-3944 > URL: https://issues.apache.org/jira/browse/FLINK-3944 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.1.0 > > > Currently, we do not support the execution of Cartesian products. > Because we do not optimize the order of joins (due to missing statistics), > joins are executed in the order in which they are specified. This works well > for the Table API, however it can be problematic in case of SQL queries where > the order of tables in the FROM clause should not matter. > In case of SQL queries, it can happen that the optimized plan contains > Cartesian products because joins are not reordered. If we add optimization > rules that switch Cartesian products and joins, such situations can be > resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins
[ https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reopened FLINK-3944: -- > Add optimization rules to reorder Cartesian products and joins > -- > > Key: FLINK-3944 > URL: https://issues.apache.org/jira/browse/FLINK-3944 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.1.0 > > > Currently, we do not support the execution of Cartesian products. > Because we do not optimize the order of joins (due to missing statistics), > joins are executed in the order in which they are specified. This works well > for the Table API, however it can be problematic in case of SQL queries where > the order of tables in the FROM clause should not matter. > In case of SQL queries, it can happen that the optimized plan contains > Cartesian products because joins are not reordered. If we add optimization > rules that switch Cartesian products and joins, such situations can be > resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3944) Add optimization rules to reorder Cartesian products and joins
[ https://issues.apache.org/jira/browse/FLINK-3944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327666#comment-15327666 ] Fabian Hueske commented on FLINK-3944: -- I tried the fix on another query and found that the search space grew too large due to the added rewrite rules. On a moderately complex query and the optimization did not terminate. Since we do not have reliable estimates, the optimizer cannot effectively prune search space and the enumeration of alternatives takes too much time. I propose to revert this commit. Users will have to specify tables in an order that does not include Cartesian products. > Add optimization rules to reorder Cartesian products and joins > -- > > Key: FLINK-3944 > URL: https://issues.apache.org/jira/browse/FLINK-3944 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.1.0 > > > Currently, we do not support the execution of Cartesian products. > Because we do not optimize the order of joins (due to missing statistics), > joins are executed in the order in which they are specified. This works well > for the Table API, however it can be problematic in case of SQL queries where > the order of tables in the FROM clause should not matter. > In case of SQL queries, it can happen that the optimized plan contains > Cartesian products because joins are not reordered. If we add optimization > rules that switch Cartesian products and joins, such situations can be > resolved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3340) Fix object juggling in drivers
[ https://issues.apache.org/jira/browse/FLINK-3340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327660#comment-15327660 ] ASF GitHub Bot commented on FLINK-3340: --- Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/1626 Hi, I know than issue closed, but he maybe have small bug https://github.com/apache/flink/pull/1626/files#diff-4a133896fec62bcabc1120b0df8cb8daR205 in hashCode recursion on this.hashCode() > Fix object juggling in drivers > -- > > Key: FLINK-3340 > URL: https://issues.apache.org/jira/browse/FLINK-3340 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.0.0 > > > {{ReduceDriver}}, {{ReduceCombineDriver}}, and {{ChainedAllReduceDriver}} are > not properly tracking objects for reuse. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1626: [FLINK-3340] [runtime] Fix object juggling in drivers
Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/1626 Hi, I know than issue closed, but he maybe have small bug https://github.com/apache/flink/pull/1626/files#diff-4a133896fec62bcabc1120b0df8cb8daR205 in hashCode recursion on this.hashCode() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327635#comment-15327635 ] ASF GitHub Bot commented on FLINK-3859: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2088 @fhueske Thanks for reviewing my code. I totally forgot to test the comparisions. I will fix the issues and see what I can do for fixed precision / scale cases. Would be great if you could implement the corresponding aggregation functions. > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327625#comment-15327625 ] ASF GitHub Bot commented on FLINK-3859: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66814270 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala --- @@ -143,15 +143,30 @@ object ScalarFunctions { new MultiTypeMethodCallGen(BuiltInMethods.ABS)) addSqlFunction( +ABS, +Seq(BIG_DEC_TYPE_INFO), +new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC)) + + addSqlFunction( --- End diff -- `FLOAT`, `INT` etc. is supported implicitly as they `shouldAutocastTo` `DOUBLE`. The most suitable method is later chosen by the Java compiler. Calcite's `SqlFunctions` provides methods for all basic types. > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4068) Move constant computations out of code-generated `flatMap` functions.
Fabian Hueske created FLINK-4068: Summary: Move constant computations out of code-generated `flatMap` functions. Key: FLINK-4068 URL: https://issues.apache.org/jira/browse/FLINK-4068 Project: Flink Issue Type: Improvement Components: Table API Affects Versions: 1.1.0 Reporter: Fabian Hueske The generated functions for expressions of the Table API or SQL include constant computations. For instance the code generated for a predicate like: {code} myInt < (10 + 20) {code} looks roughly like: {code} public void flatMap(Row in, Collector out) { Integer in1 = in.productElement(1); int temp = 10 + 20; if (in1 < temp) { out.collect(in) } } {code} In this example the computation of {{temp}} is constant and could be moved out of the {{flatMap()}} method. The same might apply for generated function other than {{FlatMap}} as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4051) RabbitMQ Source might not react to cancel signal
[ https://issues.apache.org/jira/browse/FLINK-4051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327604#comment-15327604 ] Robert Metzger commented on FLINK-4051: --- On the other hand, the Javadoc of take() states that the method is interruptible, and Flink is interrupting a thread if its not reacting to a cancel signal. Therefore, I'm wondering if this is really an issue or not. It seems that you are very eager to contribute to Flink. How about the following? - You look into FLINK-4045 (without the renaming, just adding tests for the RabbitMQ connector) - Once that is finished, we can add a specific test for RMQ source cancellation. Then, we can experiment with it and see how it reacts. What do you think? > RabbitMQ Source might not react to cancel signal > > > Key: FLINK-4051 > URL: https://issues.apache.org/jira/browse/FLINK-4051 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Subhankar Biswas > > As reported here > https://issues.apache.org/jira/browse/FLINK-3763?focusedCommentId=15322517=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15322517, > the RabbitMQ source might block forever / ignore the cancelling signal, if > its listening to an empty queue. > Fix: call nextDelivery() with a timeout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327591#comment-15327591 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2088 Think we also need aggregation functions for `DECIMAL` otherwise this won't work: ``` SELECT sum(myInt * 1.23) FROM MyTable ``` > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327589#comment-15327589 ] Robert Metzger commented on FLINK-3763: --- Okay. I'll investigate this issue and then decide how to proceed with FLINK-4051. Thank you for the report. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > Fix For: 1.1.0 > > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger support to...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2088 Think we also need aggregation functions for `DECIMAL` otherwise this won't work: ``` SELECT sum(myInt * 1.23) FROM MyTable ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4066) RabbitMQ source, customize queue arguments
[ https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4066: -- Fix Version/s: 1.1.0 > RabbitMQ source, customize queue arguments > -- > > Key: FLINK-4066 > URL: https://issues.apache.org/jira/browse/FLINK-4066 > Project: Flink > Issue Type: Improvement >Reporter: Kanstantsin Kamkou >Priority: Minor > Labels: queue, rabbitmq > Fix For: 1.1.0 > > > Please, add a functionality to customize the line (custom attributes for a > queue). > {code}channel.queueDeclare(queueName, false, false, false, null);{code} > Thank you. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4066) RabbitMQ source, customize queue arguments
[ https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4066. --- Resolution: Duplicate > RabbitMQ source, customize queue arguments > -- > > Key: FLINK-4066 > URL: https://issues.apache.org/jira/browse/FLINK-4066 > Project: Flink > Issue Type: Improvement >Reporter: Kanstantsin Kamkou >Priority: Minor > Labels: queue, rabbitmq > Fix For: 1.1.0 > > > Please, add a functionality to customize the line (custom attributes for a > queue). > {code}channel.queueDeclare(queueName, false, false, false, null);{code} > Thank you. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4066) RabbitMQ source, customize queue arguments
[ https://issues.apache.org/jira/browse/FLINK-4066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327585#comment-15327585 ] Robert Metzger commented on FLINK-4066: --- I'm closing this JIRA as invalid. The requested feature has been added in FLINK-4025 > RabbitMQ source, customize queue arguments > -- > > Key: FLINK-4066 > URL: https://issues.apache.org/jira/browse/FLINK-4066 > Project: Flink > Issue Type: Improvement >Reporter: Kanstantsin Kamkou >Priority: Minor > Labels: queue, rabbitmq > Fix For: 1.1.0 > > > Please, add a functionality to customize the line (custom attributes for a > queue). > {code}channel.queueDeclare(queueName, false, false, false, null);{code} > Thank you. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4064) Allow calling MetricGroup methods multiple times
[ https://issues.apache.org/jira/browse/FLINK-4064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327587#comment-15327587 ] Aljoscha Krettek commented on FLINK-4064: - You're right, that might be the way to go. > Allow calling MetricGroup methods multiple times > > > Key: FLINK-4064 > URL: https://issues.apache.org/jira/browse/FLINK-4064 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek > > Right now, the methods on {{MetricGroup}} can only be called once, i.e. I > have to keep the result of {{MetricGroup.counter(name)}} and use that to > report metrics. For some cases, such as adding metrics support in triggers, > it is necessary to allow calling a method multiple times and return the same > metric object. On the first call, a new metric object would be created, > subsequent calls would return that metric object. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4028) AbstractAlignedProcessingTimeWindowOperator creates wrong TimeWindow
[ https://issues.apache.org/jira/browse/FLINK-4028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327584#comment-15327584 ] ASF GitHub Bot commented on FLINK-4028: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2097 [FLINK-4028] Create correct TimeWindow in AbstractAlignedProcessingTimeWindowOperator You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink fix-aligned-time-window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2097.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2097 commit 6180df6e11b7c27f62c9e25a1d7888c0b097674b Author: Aljoscha KrettekDate: 2016-06-13T10:03:32Z [FLINK-4028] Create correct TimeWindow in AbstractAlignedProcessingTimeWindowOperator > AbstractAlignedProcessingTimeWindowOperator creates wrong TimeWindow > > > Key: FLINK-4028 > URL: https://issues.apache.org/jira/browse/FLINK-4028 > Project: Flink > Issue Type: Bug > Components: Windowing Operators >Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.1.0 > > > In {{computeWindow}} we have this code: {{new TimeWindow(timestamp, timestamp > + windowSize)}}. This is wrong because {{timestamp}} is actually the end > timestamp of the window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4065) Unstable Kafka09ITCase.testMultipleSourcesOnePartition test
[ https://issues.apache.org/jira/browse/FLINK-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327583#comment-15327583 ] Kostas Kloudas commented on FLINK-4065: --- Thanks [~rmetzger] > Unstable Kafka09ITCase.testMultipleSourcesOnePartition test > --- > > Key: FLINK-4065 > URL: https://issues.apache.org/jira/browse/FLINK-4065 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Kostas Kloudas >Priority: Critical > Labels: test-stability > > Sometime the Kafka09ITCase.testMultipleSourcesOnePartition test fails on > travis. Here is a log of such a failure: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/136758707/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2097: [FLINK-4028] Create correct TimeWindow in Abstract...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2097 [FLINK-4028] Create correct TimeWindow in AbstractAlignedProcessingTimeWindowOperator You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink fix-aligned-time-window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2097.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2097 commit 6180df6e11b7c27f62c9e25a1d7888c0b097674b Author: Aljoscha KrettekDate: 2016-06-13T10:03:32Z [FLINK-4028] Create correct TimeWindow in AbstractAlignedProcessingTimeWindowOperator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3971) Aggregates handle null values incorrectly.
[ https://issues.apache.org/jira/browse/FLINK-3971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327580#comment-15327580 ] ASF GitHub Bot commented on FLINK-3971: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2049 Thanks for the fast update @gallenvara. Good to merge! > Aggregates handle null values incorrectly. > -- > > Key: FLINK-3971 > URL: https://issues.apache.org/jira/browse/FLINK-3971 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: GaoLun >Priority: Critical > Fix For: 1.1.0 > > > Table API and SQL aggregates are supposed to ignore null values, e.g., > {{sum(1,2,null,4)}} is supposed to return {{7}}. > There current implementation is correct if at least one valid value is > present however, is incorrect if only null values are aggregated. {{sum(null, > null, null)}} should return {{null}} instead of {{0}} > Currently only the Count aggregate handles the case of null-values-only > correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2049 Thanks for the fast update @gallenvara. Good to merge! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4067) Add version header to savepoints
Ufuk Celebi created FLINK-4067: -- Summary: Add version header to savepoints Key: FLINK-4067 URL: https://issues.apache.org/jira/browse/FLINK-4067 Project: Flink Issue Type: Improvement Affects Versions: 1.0.3 Reporter: Ufuk Celebi Fix For: 1.1.0 Adding a header with version information to savepoints ensures that we can migrate savepoints between Flink versions in the future (for example when changing internal serialization formats between versions). After talking with Till, we propose to add the following meta data: - Magic number (int): identify data as savepoint - Version (int): savepoint version (independent of Flink version) - Data Offset (int): specifies at which point the actual savepoint data starts. With this, we can allow future Flink versions to add fields to the header without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink 2.0. For Flink 1.0 savepoint support, we have to try reading the savepoints without a header before failing if we don't find the magic number. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3749) Improve decimal handling
[ https://issues.apache.org/jira/browse/FLINK-3749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327574#comment-15327574 ] ASF GitHub Bot commented on FLINK-3749: --- Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1881 > Improve decimal handling > > > Key: FLINK-3749 > URL: https://issues.apache.org/jira/browse/FLINK-3749 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > The current decimal handling is too restrictive and does not allow literals > such as "11.2". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3749) Improve decimal handling
[ https://issues.apache.org/jira/browse/FLINK-3749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327573#comment-15327573 ] ASF GitHub Bot commented on FLINK-3749: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/1881 Yes, I will close this... > Improve decimal handling > > > Key: FLINK-3749 > URL: https://issues.apache.org/jira/browse/FLINK-3749 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > The current decimal handling is too restrictive and does not allow literals > such as "11.2". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1881: [FLINK-3749] [table] Improve decimal handling
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/1881 Yes, I will close this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1881: [FLINK-3749] [table] Improve decimal handling
Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/1881 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4065) Unstable Kafka09ITCase.testMultipleSourcesOnePartition test
[ https://issues.apache.org/jira/browse/FLINK-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger closed FLINK-4065. - Resolution: Invalid > Unstable Kafka09ITCase.testMultipleSourcesOnePartition test > --- > > Key: FLINK-4065 > URL: https://issues.apache.org/jira/browse/FLINK-4065 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Kostas Kloudas >Priority: Critical > Labels: test-stability > > Sometime the Kafka09ITCase.testMultipleSourcesOnePartition test fails on > travis. Here is a log of such a failure: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/136758707/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4065) Unstable Kafka09ITCase.testMultipleSourcesOnePartition test
[ https://issues.apache.org/jira/browse/FLINK-4065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327572#comment-15327572 ] Robert Metzger commented on FLINK-4065: --- 4 days ago, I improved the Kafka tests stability a bit (https://issues.apache.org/jira/browse/FLINK-3530), I also changed the behavior of the mentioned test. From the logs it seems you've been running the tests on a slightly outdated master. I'll close this issue as invalid for now. Please reopen it if the error occurs again. > Unstable Kafka09ITCase.testMultipleSourcesOnePartition test > --- > > Key: FLINK-4065 > URL: https://issues.apache.org/jira/browse/FLINK-4065 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Kostas Kloudas >Priority: Critical > Labels: test-stability > > Sometime the Kafka09ITCase.testMultipleSourcesOnePartition test fails on > travis. Here is a log of such a failure: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/136758707/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327549#comment-15327549 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2088 Thanks for the PR, @twalthr! Looks mostly good. I found a few cases that needs to be fix. We also should think about how to handle SQL `DECIMAL` types with fixed precision / scale, e.g., how to handle something like `SELECT myDouble AS DECIMAL(4,2)`. Do you think this could be easily added with this PR or rather be fix in a later issue? > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger support to...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2088 Thanks for the PR, @twalthr! Looks mostly good. I found a few cases that needs to be fix. We also should think about how to handle SQL `DECIMAL` types with fixed precision / scale, e.g., how to handle something like `SELECT myDouble AS DECIMAL(4,2)`. Do you think this could be easily added with this PR or rather be fix in a later issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66807225 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/DecimalTypeTest.scala --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expression + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.expression.utils.ExpressionTestBase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.junit.Test + +class DecimalTypeTest extends ExpressionTestBase { + + @Test + def testDecimalLiterals(): Unit = { +// implicit double +testAllApis( + 11.2, + "11.2", + "11.2", + "11.2") + +// implicit double +testAllApis( + 0.7623533651719233, + "0.7623533651719233", + "0.7623533651719233", + "0.7623533651719233") + +// explicit decimal (with precision of 19) +testAllApis( + BigDecimal("1234567891234567891"), + "1234567891234567891p", + "1234567891234567891", + "1234567891234567891") + +// explicit decimal (high precision, not SQL compliant) +testTableApi( + BigDecimal("123456789123456789123456789"), + "123456789123456789123456789p", + "123456789123456789123456789") + +// explicit decimal (high precision, not SQL compliant) +testTableApi( + BigDecimal("12.3456789123456789123456789"), + "12.3456789123456789123456789p", + "12.3456789123456789123456789") + } + + @Test + def testDecimalBorders(): Unit = { +testAllApis( + Double.MaxValue, + Double.MaxValue.toString, + Double.MaxValue.toString, + Double.MaxValue.toString) + +testAllApis( + Double.MinValue, + Double.MinValue.toString, + Double.MinValue.toString, + Double.MinValue.toString) + +testAllApis( + Double.MinValue.cast(FLOAT_TYPE_INFO), + s"${Double.MinValue}.cast(FLOAT)", + s"CAST(${Double.MinValue} AS FLOAT)", + Float.NegativeInfinity.toString) + +testAllApis( + Byte.MinValue.cast(BYTE_TYPE_INFO), + s"(${Byte.MinValue}).cast(BYTE)", + s"CAST(${Byte.MinValue} AS TINYINT)", + Byte.MinValue.toString) + +testAllApis( + Byte.MinValue.cast(BYTE_TYPE_INFO) - 1.cast(BYTE_TYPE_INFO), + s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)", + s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)", + Byte.MaxValue.toString) + +testAllApis( + Short.MinValue.cast(SHORT_TYPE_INFO), + s"(${Short.MinValue}).cast(SHORT)", + s"CAST(${Short.MinValue} AS SMALLINT)", + Short.MinValue.toString) + +testAllApis( + Int.MinValue.cast(INT_TYPE_INFO) - 1, + s"(${Int.MinValue}).cast(INT) - 1", + s"CAST(${Int.MinValue} AS INT) - 1", + Int.MaxValue.toString) + +testAllApis( + Long.MinValue.cast(LONG_TYPE_INFO), + s"(${Long.MinValue}L).cast(LONG)", + s"CAST(${Long.MinValue} AS BIGINT)", + Long.MinValue.toString) + } + + @Test + def testDecimalCasting(): Unit = { +// from String +testTableApi( + "123456789123456789123456789".cast(BIG_DEC_TYPE_INFO), + "'123456789123456789123456789'.cast(DECIMAL)", + "123456789123456789123456789") + +// from double +testAllApis( + 'f3.cast(BIG_DEC_TYPE_INFO), + "f3.cast(DECIMAL)", + "CAST(f3 AS DECIMAL)", + "4.2") + +// to double
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327547#comment-15327547 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66807225 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/DecimalTypeTest.scala --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expression + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.expression.utils.ExpressionTestBase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.junit.Test + +class DecimalTypeTest extends ExpressionTestBase { + + @Test + def testDecimalLiterals(): Unit = { +// implicit double +testAllApis( + 11.2, + "11.2", + "11.2", + "11.2") + +// implicit double +testAllApis( + 0.7623533651719233, + "0.7623533651719233", + "0.7623533651719233", + "0.7623533651719233") + +// explicit decimal (with precision of 19) +testAllApis( + BigDecimal("1234567891234567891"), + "1234567891234567891p", + "1234567891234567891", + "1234567891234567891") + +// explicit decimal (high precision, not SQL compliant) +testTableApi( + BigDecimal("123456789123456789123456789"), + "123456789123456789123456789p", + "123456789123456789123456789") + +// explicit decimal (high precision, not SQL compliant) +testTableApi( + BigDecimal("12.3456789123456789123456789"), + "12.3456789123456789123456789p", + "12.3456789123456789123456789") + } + + @Test + def testDecimalBorders(): Unit = { +testAllApis( + Double.MaxValue, + Double.MaxValue.toString, + Double.MaxValue.toString, + Double.MaxValue.toString) + +testAllApis( + Double.MinValue, + Double.MinValue.toString, + Double.MinValue.toString, + Double.MinValue.toString) + +testAllApis( + Double.MinValue.cast(FLOAT_TYPE_INFO), + s"${Double.MinValue}.cast(FLOAT)", + s"CAST(${Double.MinValue} AS FLOAT)", + Float.NegativeInfinity.toString) + +testAllApis( + Byte.MinValue.cast(BYTE_TYPE_INFO), + s"(${Byte.MinValue}).cast(BYTE)", + s"CAST(${Byte.MinValue} AS TINYINT)", + Byte.MinValue.toString) + +testAllApis( + Byte.MinValue.cast(BYTE_TYPE_INFO) - 1.cast(BYTE_TYPE_INFO), + s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)", + s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)", + Byte.MaxValue.toString) + +testAllApis( + Short.MinValue.cast(SHORT_TYPE_INFO), + s"(${Short.MinValue}).cast(SHORT)", + s"CAST(${Short.MinValue} AS SMALLINT)", + Short.MinValue.toString) + +testAllApis( + Int.MinValue.cast(INT_TYPE_INFO) - 1, + s"(${Int.MinValue}).cast(INT) - 1", + s"CAST(${Int.MinValue} AS INT) - 1", + Int.MaxValue.toString) + +testAllApis( + Long.MinValue.cast(LONG_TYPE_INFO), + s"(${Long.MinValue}L).cast(LONG)", + s"CAST(${Long.MinValue} AS BIGINT)", + Long.MinValue.toString) + } + + @Test + def testDecimalCasting(): Unit = { +// from String +testTableApi( + "123456789123456789123456789".cast(BIG_DEC_TYPE_INFO), + "'123456789123456789123456789'.cast(DECIMAL)", +
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327544#comment-15327544 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66806299 --- Diff: docs/apis/cli.md --- @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduledShow only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. --- End diff -- Same as for `list` and `info`. I agree this verbosity is not very nice. Looking into how I can make this look nicer. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66806299 --- Diff: docs/apis/cli.md --- @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduledShow only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. --- End diff -- Same as for `list` and `info`. I agree this verbosity is not very nice. Looking into how I can make this look nicer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327539#comment-15327539 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805960 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() { } /** -* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster -* connected to this client. +* Gets the current JobManager address from the Flink configuration (may change in case of a HA setup). +* @return The address (host and port) of the leading JobManager */ - public int getMaxSlots() { - return this.maxSlots; + public InetSocketAddress getJobManagerAddressFromConfig() { + try { + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + return new InetSocketAddress(hostName, port); --- End diff -- Fixed. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327537#comment-15327537 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2085 Thanks for the review! I've addressed almost all your comments and will ping you once the PR get updated. Some more comments: > (1) When listing the yarn-session.sh cli options, I could not see the newly added argument Fixed. The argument was not printed. The code now prints all available options. >Starting a YARN session works. Also, connecting with a second YARN session works. However, if I'm stopping the session from the second session client, the first one does not notice this. It would be good if all connected clients receive the messages, instead of the last one connected. Yes, would be nice to have that feature but I think it is out of the scope of this PR. >Detached per job submission is not working (./bin/flink run -m yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar) You're running a WordCount with interactive job submission detached. That doesn't work because the plan can't be extracted in interactive programs (programs which use count/collect/print). >While trying out your code, I noticed that the .yarn-properties file is not properly deleted, even though my previous yarn session was shutting down correctly: I'll look into it. I know that this is also an issue in the current master. I had the properties file lingering around many times. I think it is not properly cleaned up in the ShutdownHook. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327538#comment-15327538 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805953 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; + + +/** + * Custom command-line interface to load hooks for the command-line interface. + */ +public interface CustomCommandLine { + + /** +* Returns a unique identifier for this custom command-line. +* @return An unique identifier string +*/ + String getIdentifier(); + + /** +* Adds custom options to the existing run options. +* @param baseOptions The existing options. +*/ + void addRunOptions(Options baseOptions); + + /** +* Adds custom options to the existing general options. +* @param baseOptions The existing options. +*/ + void addGeneralOptions(Options baseOptions); + + /** +* Retrieves a client for a running cluster +* @param commandLine The command-line parameters from the CliFrontend +* @param config The Flink config +* @return Client if a cluster could be retrieve, null otherwise --- End diff -- Thanks. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805960 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() { } /** -* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster -* connected to this client. +* Gets the current JobManager address from the Flink configuration (may change in case of a HA setup). +* @return The address (host and port) of the leading JobManager */ - public int getMaxSlots() { - return this.maxSlots; + public InetSocketAddress getJobManagerAddressFromConfig() { + try { + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + return new InetSocketAddress(hostName, port); --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805953 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; + + +/** + * Custom command-line interface to load hooks for the command-line interface. + */ +public interface CustomCommandLine { + + /** +* Returns a unique identifier for this custom command-line. +* @return An unique identifier string +*/ + String getIdentifier(); + + /** +* Adds custom options to the existing run options. +* @param baseOptions The existing options. +*/ + void addRunOptions(Options baseOptions); + + /** +* Adds custom options to the existing general options. +* @param baseOptions The existing options. +*/ + void addGeneralOptions(Options baseOptions); + + /** +* Retrieves a client for a running cluster +* @param commandLine The command-line parameters from the CliFrontend +* @param config The Flink config +* @return Client if a cluster could be retrieve, null otherwise --- End diff -- Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327536#comment-15327536 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805706 --- Diff: docs/setup/yarn_setup.md --- @@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink. Use the YARN utilities (`yarn application -kill `) to stop the YARN session. + Attach to an existing Session + +Use the following command to start a session + +~~~bash +./bin/yarn-session.sh +~~~ + +This command will show you the following overview: + +~~~bash +Usage: + Required + -id,--applicationId YARN application Id --- End diff -- Yes, same as above. Fixed. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2085 Thanks for the review! I've addressed almost all your comments and will ping you once the PR get updated. Some more comments: > (1) When listing the yarn-session.sh cli options, I could not see the newly added argument Fixed. The argument was not printed. The code now prints all available options. >Starting a YARN session works. Also, connecting with a second YARN session works. However, if I'm stopping the session from the second session client, the first one does not notice this. It would be good if all connected clients receive the messages, instead of the last one connected. Yes, would be nice to have that feature but I think it is out of the scope of this PR. >Detached per job submission is not working (./bin/flink run -m yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar) You're running a WordCount with interactive job submission detached. That doesn't work because the plan can't be extracted in interactive programs (programs which use count/collect/print). >While trying out your code, I noticed that the .yarn-properties file is not properly deleted, even though my previous yarn session was shutting down correctly: I'll look into it. I know that this is also an issue in the current master. I had the properties file lingering around many times. I think it is not properly cleaned up in the ShutdownHook. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327535#comment-15327535 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66805682 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -122,14 +125,14 @@ object ScalarOperators { right: GeneratedExpression) : GeneratedExpression = { generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { - if (isString(left) && isString(right)) { + if (isReference(left) && isReference(right)) { (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0" } - else if (isNumeric(left) && isNumeric(right)) { + else if (isNumeric(left.resultType) && isNumeric(right.resultType)) { --- End diff -- If `left` or `right` is a `BigDecimal` invalid code is generated. > Caused by: org.codehaus.commons.compiler.CompileException: Line 64, Column 0: Cannot compare types "int" and "java.math.BigDecimal" This happens if you replace the query in `FilterITCase.testFilterOnInteger()` by `SELECT * FROM MyTable WHERE a < 4.25`. `MyTable.a` is an `INTEGER` column and not auto-casted to `DECIMAL`. Auto-casting seems to work for expressions in the `SELECT` clause. > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805706 --- Diff: docs/setup/yarn_setup.md --- @@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink. Use the YARN utilities (`yarn application -kill `) to stop the YARN session. + Attach to an existing Session + +Use the following command to start a session + +~~~bash +./bin/yarn-session.sh +~~~ + +This command will show you the following overview: + +~~~bash +Usage: + Required + -id,--applicationId YARN application Id --- End diff -- Yes, same as above. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66805682 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -122,14 +125,14 @@ object ScalarOperators { right: GeneratedExpression) : GeneratedExpression = { generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { - if (isString(left) && isString(right)) { + if (isReference(left) && isReference(right)) { (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0" } - else if (isNumeric(left) && isNumeric(right)) { + else if (isNumeric(left.resultType) && isNumeric(right.resultType)) { --- End diff -- If `left` or `right` is a `BigDecimal` invalid code is generated. > Caused by: org.codehaus.commons.compiler.CompileException: Line 64, Column 0: Cannot compare types "int" and "java.math.BigDecimal" This happens if you replace the query in `FilterITCase.testFilterOnInteger()` by `SELECT * FROM MyTable WHERE a < 4.25`. `MyTable.a` is an `INTEGER` column and not auto-casted to `DECIMAL`. Auto-casting seems to work for expressions in the `SELECT` clause. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327534#comment-15327534 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805499 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + + } + return null; --- End diff -- Same as above. Fixed. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327530#comment-15327530 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805417 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- Thank you, I'll look into this. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327533#comment-15327533 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805492 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + --- End diff -- Good idea. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327531#comment-15327531 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805455 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -415,62 +513,83 @@ public int run(String[] args) { } System.out.println(description); return 0; + } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM --- End diff -- Sorry, added this TODO while I fixing some of your first comments. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805492 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + --- End diff -- Good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327532#comment-15327532 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805479 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); + + while(true) { --- End diff -- Yes, I wanted to bring up the Yarn session only once the cluster is ready. It is a semantic change but IMHO transparent to the user. There is not disadvantage from waiting until the cluster is ready. Ultimately, it would be nice to get rid of all waiting but we're not quite there yet. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327529#comment-15327529 ] ASF GitHub Bot commented on FLINK-3937: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805382 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); --- End diff -- Thank you. I fixed that by using a non-static variable for the logger and dynamically retrieving the class name, i.e. ```java private final Logger LOG = LoggerFactory.getLogger(getClass()); ``` > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805499 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + + } + return null; --- End diff -- Same as above. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805479 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); + + while(true) { --- End diff -- Yes, I wanted to bring up the Yarn session only once the cluster is ready. It is a semantic change but IMHO transparent to the user. There is not disadvantage from waiting until the cluster is ready. Ultimately, it would be nice to get rid of all waiting but we're not quite there yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805455 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -415,62 +513,83 @@ public int run(String[] args) { } System.out.println(description); return 0; + } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM --- End diff -- Sorry, added this TODO while I fixing some of your first comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805382 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); --- End diff -- Thank you. I fixed that by using a non-static variable for the logger and dynamically retrieving the class name, i.e. ```java private final Logger LOG = LoggerFactory.getLogger(getClass()); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805417 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- Thank you, I'll look into this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327524#comment-15327524 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66804777 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala --- @@ -143,15 +143,30 @@ object ScalarFunctions { new MultiTypeMethodCallGen(BuiltInMethods.ABS)) addSqlFunction( +ABS, +Seq(BIG_DEC_TYPE_INFO), +new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC)) + + addSqlFunction( --- End diff -- Just wondering, why are there no functions for `FLOAT`? Are they handled by the `DOUBLE` functions? > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66804777 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala --- @@ -143,15 +143,30 @@ object ScalarFunctions { new MultiTypeMethodCallGen(BuiltInMethods.ABS)) addSqlFunction( +ABS, +Seq(BIG_DEC_TYPE_INFO), +new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC)) + + addSqlFunction( --- End diff -- Just wondering, why are there no functions for `FLOAT`? Are they handled by the `DOUBLE` functions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3859) Add BigDecimal/BigInteger support to Table API
[ https://issues.apache.org/jira/browse/FLINK-3859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327518#comment-15327518 ] ASF GitHub Bot commented on FLINK-3859: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66804568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala --- @@ -97,11 +97,26 @@ object CodeGenUtils { case _ => "null" } - def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match { -case nti: NumericTypeInfo[_] => // ok -case _ => throw new CodeGenException("Numeric expression type expected.") + def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo match { +case _: FractionalTypeInfo[_] => "double" +case _ => "long" } + // -- + + def requireNumeric(genExpr: GeneratedExpression) = +if (!TypeCheckUtils.isNumeric(genExpr.resultType)) { + throw new CodeGenException("Numeric expression type expected, but was " + +s"'${genExpr.resultType}'") +} + + def requireNumericOrComparable(genExpr: GeneratedExpression) = --- End diff -- Rename to `requireComparable()`? > Add BigDecimal/BigInteger support to Table API > -- > > Key: FLINK-3859 > URL: https://issues.apache.org/jira/browse/FLINK-3859 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Critical > > Since FLINK-3786 has been solved, we can now start integrating > BigDecimal/BigInteger into the Table API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2088: [FLINK-3859] [table] Add BigDecimal/BigInteger sup...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2088#discussion_r66804568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala --- @@ -97,11 +97,26 @@ object CodeGenUtils { case _ => "null" } - def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match { -case nti: NumericTypeInfo[_] => // ok -case _ => throw new CodeGenException("Numeric expression type expected.") + def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo match { +case _: FractionalTypeInfo[_] => "double" +case _ => "long" } + // -- + + def requireNumeric(genExpr: GeneratedExpression) = +if (!TypeCheckUtils.isNumeric(genExpr.resultType)) { + throw new CodeGenException("Numeric expression type expected, but was " + +s"'${genExpr.resultType}'") +} + + def requireNumericOrComparable(genExpr: GeneratedExpression) = --- End diff -- Rename to `requireComparable()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327501#comment-15327501 ] ASF GitHub Bot commented on FLINK-3937: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2085 Thanks a lot for adding this feature. I looked through the code and tried it out on a cluster. Once my concerns are addressed (either through changes or comments) we can merge the change. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2085 Thanks a lot for adding this feature. I looked through the code and tried it out on a cluster. Once my concerns are addressed (either through changes or comments) we can merge the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327495#comment-15327495 ] ASF GitHub Bot commented on FLINK-3937: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66801768 --- Diff: docs/setup/yarn_setup.md --- @@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink. Use the YARN utilities (`yarn application -kill `) to stop the YARN session. + Attach to an existing Session + +Use the following command to start a session + +~~~bash +./bin/yarn-session.sh +~~~ + +This command will show you the following overview: + +~~~bash +Usage: + Required + -id,--applicationId YARN application Id --- End diff -- I think this option is also not listed when running ./bin/yarn-session.sh. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66801768 --- Diff: docs/setup/yarn_setup.md --- @@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink. Use the YARN utilities (`yarn application -kill `) to stop the YARN session. + Attach to an existing Session + +Use the following command to start a session + +~~~bash +./bin/yarn-session.sh +~~~ + +This command will show you the following overview: + +~~~bash +Usage: + Required + -id,--applicationId YARN application Id --- End diff -- I think this option is also not listed when running ./bin/yarn-session.sh. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327491#comment-15327491 ] ASF GitHub Bot commented on FLINK-3937: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2085 Detached per job submission is not working (`./bin/flink run -m yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar `) ``` 2016-06-13 07:28:22,423 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.0.2.15:45521/user/jobmanager. 2016-06-13 07:28:22,829 INFO org.apache.flink.yarn.ApplicationClient - Successfully registered at the ResourceManager using JobManager Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090] TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) All TaskManagers are connected Cluster started Using address /10.0.2.15:45521 to connect to JobManager. JobManager web interface address http://quickstart.cloudera:8088/proxy/application_1447844011707_0039/ Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. The program finished with the following exception: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, job id etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. org.apache.flink.client.program.DetachedEnvironment$DetachedJobExecutionResult.getAccumulatorResult(DetachedEnvironment.java:103) org.apache.flink.api.java.DataSet.collect(DataSet.java:412) org.apache.flink.api.java.DataSet.print(DataSet.java:1605) org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92) 2016-06-13 07:28:27,206 INFO org.apache.flink.yarn.YarnClusterClient - Disconnecting YarnClusterClient from ApplicationMaster 2016-06-13 07:28:27,208 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-06-13 07:28:27,208 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090]. 2016-06-13 07:28:27,307 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1447844011707_0039 finished with state RUNNING and final state UNDEFINED at 0 [cloudera@quickstart build-target] ``` In the JobManager logs, there is no sign of a job being submitted. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2085 Detached per job submission is not working (`./bin/flink run -m yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar `) ``` 2016-06-13 07:28:22,423 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.0.2.15:45521/user/jobmanager. 2016-06-13 07:28:22,829 INFO org.apache.flink.yarn.ApplicationClient - Successfully registered at the ResourceManager using JobManager Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090] TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) TaskManager status (0/1) All TaskManagers are connected Cluster started Using address /10.0.2.15:45521 to connect to JobManager. JobManager web interface address http://quickstart.cloudera:8088/proxy/application_1447844011707_0039/ Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. The program finished with the following exception: Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, job id etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]. org.apache.flink.client.program.DetachedEnvironment$DetachedJobExecutionResult.getAccumulatorResult(DetachedEnvironment.java:103) org.apache.flink.api.java.DataSet.collect(DataSet.java:412) org.apache.flink.api.java.DataSet.print(DataSet.java:1605) org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92) 2016-06-13 07:28:27,206 INFO org.apache.flink.yarn.YarnClusterClient - Disconnecting YarnClusterClient from ApplicationMaster 2016-06-13 07:28:27,208 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-06-13 07:28:27,208 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@10.0.2.15:45521/user/jobmanager#1868998090]. 2016-06-13 07:28:27,307 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1447844011707_0039 finished with state RUNNING and final state UNDEFINED at 0 [cloudera@quickstart build-target] ``` In the JobManager logs, there is no sign of a job being submitted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327487#comment-15327487 ] ASF GitHub Bot commented on FLINK-3937: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2085 While trying out your code, I noticed that the `.yarn-properties` file is not properly deleted, even though my previous yarn session was shutting down correctly: ``` 2016-06-13 07:16:51,885 INFO org.apache.flink.client.program.ClusterClient - TaskManager status (0/1) TaskManager status (0/1) 2016-06-13 07:16:52,386 INFO org.apache.flink.client.program.ClusterClient - TaskManager status (0/1) TaskManager status (0/1) 2016-06-13 07:16:52,887 INFO org.apache.flink.client.program.ClusterClient - All TaskManagers are connected All TaskManagers are connected 2016-06-13 07:16:52,887 INFO org.apache.flink.client.program.ClusterClient - Looking up JobManager 2016-06-13 07:16:52,912 INFO org.apache.flink.client.program.ClusterClient - Looking up JobManager Flink JobManager is now running on 10.0.2.15:51747 JobManager Web Interface: http://quickstart.cloudera:8088/proxy/application_1447844011707_0038/ Number of connected TaskManagers changed to 1. Slots available: 1 ^[[A^C2016-06-13 07:25:35,370 INFO org.apache.flink.yarn.YarnClusterClient - Shutting down YarnClusterClient from the client shutdown hook 2016-06-13 07:25:35,372 INFO org.apache.flink.yarn.YarnClusterClient - Sending shutdown request to the Application Master 2016-06-13 07:25:35,373 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-06-13 07:25:35,429 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-06-13 07:25:35,431 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]. 2016-06-13 07:25:35,469 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2016-06-13 07:25:35,469 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2016-06-13 07:25:35,622 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down. 2016-06-13 07:25:35,701 INFO org.apache.flink.yarn.YarnClusterClient - Deleting files in hdfs://quickstart.cloudera:8020/user/cloudera/.flink/application_1447844011707_0038 2016-06-13 07:25:35,706 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1447844011707_0038 finished with state FINISHED and final state SUCCEEDED at 1465827935399 2016-06-13 07:25:36,567 INFO org.apache.flink.yarn.YarnClusterClient - YARN Client is shutting down (reverse-i-search)`yarn': ./bin/^Crn-session.sh -n 1 [cloudera@quickstart build-target]$ ./bin/flink run -y yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar JAR file does not exist: -y Use the help option (-h or --help) to get help on the command. [cloudera@quickstart build-target]$ ./bin/flink run -m yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar 2016-06-13 07:26:11,057 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found YARN properties file /tmp/.yarn-properties-cloudera 2016-06-13 07:26:11,057 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found YARN properties file /tmp/.yarn-properties-cloudera Found YARN properties file /tmp/.yarn-properties-cloudera 2016-06-13 07:26:11,082 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Using JobManager address from YARN properties quickstart.cloudera/10.0.2.15:51747 2016-06-13 07:26:11,082 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Using JobManager address from YARN properties quickstart.cloudera/10.0.2.15:51747 Using JobManager address from YARN properties quickstart.cloudera/10.0.2.15:51747 2016-06-13 07:26:11,270 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 2016-06-13 07:26:11,795 INFO org.apache.flink.yarn.YarnClusterDescriptor - Found application 'application_1447844011707_0038' with JobManager host name 'quickstart.cloudera' and port '51747' from Yarn properties file. 2016-06-13 07:26:11,842 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
[GitHub] flink issue #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2085 While trying out your code, I noticed that the `.yarn-properties` file is not properly deleted, even though my previous yarn session was shutting down correctly: ``` 2016-06-13 07:16:51,885 INFO org.apache.flink.client.program.ClusterClient - TaskManager status (0/1) TaskManager status (0/1) 2016-06-13 07:16:52,386 INFO org.apache.flink.client.program.ClusterClient - TaskManager status (0/1) TaskManager status (0/1) 2016-06-13 07:16:52,887 INFO org.apache.flink.client.program.ClusterClient - All TaskManagers are connected All TaskManagers are connected 2016-06-13 07:16:52,887 INFO org.apache.flink.client.program.ClusterClient - Looking up JobManager 2016-06-13 07:16:52,912 INFO org.apache.flink.client.program.ClusterClient - Looking up JobManager Flink JobManager is now running on 10.0.2.15:51747 JobManager Web Interface: http://quickstart.cloudera:8088/proxy/application_1447844011707_0038/ Number of connected TaskManagers changed to 1. Slots available: 1 ^[[A^C2016-06-13 07:25:35,370 INFO org.apache.flink.yarn.YarnClusterClient - Shutting down YarnClusterClient from the client shutdown hook 2016-06-13 07:25:35,372 INFO org.apache.flink.yarn.YarnClusterClient - Sending shutdown request to the Application Master 2016-06-13 07:25:35,373 INFO org.apache.flink.yarn.ApplicationClient - Sending StopCluster request to JobManager. 2016-06-13 07:25:35,429 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-06-13 07:25:35,431 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]. 2016-06-13 07:25:35,469 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. 2016-06-13 07:25:35,469 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. 2016-06-13 07:25:35,622 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down. 2016-06-13 07:25:35,701 INFO org.apache.flink.yarn.YarnClusterClient - Deleting files in hdfs://quickstart.cloudera:8020/user/cloudera/.flink/application_1447844011707_0038 2016-06-13 07:25:35,706 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1447844011707_0038 finished with state FINISHED and final state SUCCEEDED at 1465827935399 2016-06-13 07:25:36,567 INFO org.apache.flink.yarn.YarnClusterClient - YARN Client is shutting down (reverse-i-search)`yarn': ./bin/^Crn-session.sh -n 1 [cloudera@quickstart build-target]$ ./bin/flink run -y yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar JAR file does not exist: -y Use the help option (-h or --help) to get help on the command. [cloudera@quickstart build-target]$ ./bin/flink run -m yarn-cluster -yd -yn 1 ./examples/batch/WordCount.jar 2016-06-13 07:26:11,057 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found YARN properties file /tmp/.yarn-properties-cloudera 2016-06-13 07:26:11,057 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found YARN properties file /tmp/.yarn-properties-cloudera Found YARN properties file /tmp/.yarn-properties-cloudera 2016-06-13 07:26:11,082 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Using JobManager address from YARN properties quickstart.cloudera/10.0.2.15:51747 2016-06-13 07:26:11,082 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Using JobManager address from YARN properties quickstart.cloudera/10.0.2.15:51747 Using JobManager address from YARN properties quickstart.cloudera/10.0.2.15:51747 2016-06-13 07:26:11,270 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 2016-06-13 07:26:11,795 INFO org.apache.flink.yarn.YarnClusterDescriptor - Found application 'application_1447844011707_0038' with JobManager host name 'quickstart.cloudera' and port '51747' from Yarn properties file. 2016-06-13 07:26:11,842 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 2016-06-13 07:26:11,878 ERROR org.apache.flink.yarn.YarnClusterDescriptor - The application application_1447844011707_0038 doesn't run anymore. It has previously completed with final status: SUCCEEDED
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66800124 --- Diff: docs/apis/cli.md --- @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduledShow only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. --- End diff -- I tried this, but I wonder why its not logging that the job has been cancelled. It logs that "All TaskManagers are connected", I don't think this message is relevant when cancelling a job. ``` [cloudera@quickstart build-target]$ ./bin/flink cancel -m yarn-cluster -yid application_1447844011707_0038 b9b8f76616073d09c596545a3cda978f 2016-06-13 07:22:21,355 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 2016-06-13 07:22:22,277 INFO org.apache.flink.yarn.YarnClusterClient - Start application client. 2016-06-13 07:22:22,288 INFO org.apache.flink.yarn.ApplicationClient - Notification about new leader address akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null. 2016-06-13 07:22:22,290 INFO org.apache.flink.yarn.ApplicationClient - Received address of new leader akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null. 2016-06-13 07:22:22,290 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null. Waiting until all TaskManagers have connected 2016-06-13 07:22:22,297 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.0.2.15:51747/user/jobmanager. No status updates from the YARN cluster received so far. Waiting ... 2016-06-13 07:22:22,542 INFO org.apache.flink.yarn.ApplicationClient - Successfully registered at the ResourceManager using JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764] All TaskManagers are connected 2016-06-13 07:22:22,945 INFO org.apache.flink.yarn.YarnClusterClient - Shutting down YarnClusterClient from the client shutdown hook 2016-06-13 07:22:22,945 INFO org.apache.flink.yarn.YarnClusterClient - Disconnecting YarnClusterClient from ApplicationMaster 2016-06-13 07:22:22,947 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-06-13 07:22:22,947 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]. 2016-06-13 07:22:23,056 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1447844011707_0038 finished with state RUNNING and final state UNDEFINED at 0 [cloudera@quickstart build-target]$ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327477#comment-15327477 ] ASF GitHub Bot commented on FLINK-3937: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66800124 --- Diff: docs/apis/cli.md --- @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduledShow only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. --- End diff -- I tried this, but I wonder why its not logging that the job has been cancelled. It logs that "All TaskManagers are connected", I don't think this message is relevant when cancelling a job. ``` [cloudera@quickstart build-target]$ ./bin/flink cancel -m yarn-cluster -yid application_1447844011707_0038 b9b8f76616073d09c596545a3cda978f 2016-06-13 07:22:21,355 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 2016-06-13 07:22:22,277 INFO org.apache.flink.yarn.YarnClusterClient - Start application client. 2016-06-13 07:22:22,288 INFO org.apache.flink.yarn.ApplicationClient - Notification about new leader address akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null. 2016-06-13 07:22:22,290 INFO org.apache.flink.yarn.ApplicationClient - Received address of new leader akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null. 2016-06-13 07:22:22,290 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null. Waiting until all TaskManagers have connected 2016-06-13 07:22:22,297 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.0.2.15:51747/user/jobmanager. No status updates from the YARN cluster received so far. Waiting ... 2016-06-13 07:22:22,542 INFO org.apache.flink.yarn.ApplicationClient - Successfully registered at the ResourceManager using JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764] All TaskManagers are connected 2016-06-13 07:22:22,945 INFO org.apache.flink.yarn.YarnClusterClient - Shutting down YarnClusterClient from the client shutdown hook 2016-06-13 07:22:22,945 INFO org.apache.flink.yarn.YarnClusterClient - Disconnecting YarnClusterClient from ApplicationMaster 2016-06-13 07:22:22,947 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-06-13 07:22:22,947 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]. 2016-06-13 07:22:23,056 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1447844011707_0038 finished with state RUNNING and final state UNDEFINED at 0 [cloudera@quickstart build-target]$ ``` > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327463#comment-15327463 ] ASF GitHub Bot commented on FLINK-3937: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66799100 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- Are there any tests for this functionality? > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Maximilian Michels >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66799100 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- Are there any tests for this functionality? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---