[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359975#comment-15359975 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69372418 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 --- End diff -- added in doc. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359974#comment-15359974 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69372416 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} --- End diff -- done. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector
Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69372418 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 --- End diff -- added in doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1813: [FLINK-3034] Redis Sink Connector
Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69372416 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} --- End diff -- done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3965) Delegating GraphAlgorithm
[ https://issues.apache.org/jira/browse/FLINK-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-3965. - Resolution: Implemented Implemented in 149e7a01445b4ba494409472dc8b0b15c7221e9e > Delegating GraphAlgorithm > - > > Key: FLINK-3965 > URL: https://issues.apache.org/jira/browse/FLINK-3965 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Complex and related algorithms often overlap in computation of data. Two such > examples are: > 1) the local and global clustering coefficients each use a listing of > triangles > 2) the local clustering coefficient joins on vertex degree, and the > underlying triangle listing annotates edge degree which uses vertex degree > We can reuse and rewrite algorithm output by creating a {{ProxyObject}} as a > delegate for method calls to the {{DataSet}} returned by the algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4135) Replace ChecksumHashCode as GraphAnalytic
[ https://issues.apache.org/jira/browse/FLINK-4135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-4135. - Resolution: Implemented Implemented in 0efa6441420ba4a74ecb9a7d70d0a0d80e25e292 > Replace ChecksumHashCode as GraphAnalytic > - > > Key: FLINK-4135 > URL: https://issues.apache.org/jira/browse/FLINK-4135 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.1.0 > > > Create a {{GraphAnalytic}} to replace {{GraphUtils.checksumHashCode}} as > there is nothing special about this computation and we can remove this > function from the API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2188: [FLINK-4135] [gelly] Replace ChecksumHashCode as G...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2188 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3965) Delegating GraphAlgorithm
[ https://issues.apache.org/jira/browse/FLINK-3965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359498#comment-15359498 ] ASF GitHub Bot commented on FLINK-3965: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2032 > Delegating GraphAlgorithm > - > > Key: FLINK-3965 > URL: https://issues.apache.org/jira/browse/FLINK-3965 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Complex and related algorithms often overlap in computation of data. Two such > examples are: > 1) the local and global clustering coefficients each use a listing of > triangles > 2) the local clustering coefficient joins on vertex degree, and the > underlying triangle listing annotates edge degree which uses vertex degree > We can reuse and rewrite algorithm output by creating a {{ProxyObject}} as a > delegate for method calls to the {{DataSet}} returned by the algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4135) Replace ChecksumHashCode as GraphAnalytic
[ https://issues.apache.org/jira/browse/FLINK-4135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359499#comment-15359499 ] ASF GitHub Bot commented on FLINK-4135: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2188 > Replace ChecksumHashCode as GraphAnalytic > - > > Key: FLINK-4135 > URL: https://issues.apache.org/jira/browse/FLINK-4135 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.1.0 > > > Create a {{GraphAnalytic}} to replace {{GraphUtils.checksumHashCode}} as > there is nothing special about this computation and we can remove this > function from the API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2032: [FLINK-3965] [gelly] Delegating GraphAlgorithm
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2032 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4144) Yarn properties file: replace hostname/port with Yarn application id
[ https://issues.apache.org/jira/browse/FLINK-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-4144. --- Resolution: Fixed Resolved with 7ab6837fde3adb588273ef6bb8f4f7a215fe9c03 > Yarn properties file: replace hostname/port with Yarn application id > > > Key: FLINK-4144 > URL: https://issues.apache.org/jira/browse/FLINK-4144 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > We should use the application id instead of the host/port. The hostname and > port of the JobManager can change (HA). Also, it is not unique depending on > the network configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4144) Yarn properties file: replace hostname/port with Yarn application id
[ https://issues.apache.org/jira/browse/FLINK-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359397#comment-15359397 ] ASF GitHub Bot commented on FLINK-4144: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2191 > Yarn properties file: replace hostname/port with Yarn application id > > > Key: FLINK-4144 > URL: https://issues.apache.org/jira/browse/FLINK-4144 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > We should use the application id instead of the host/port. The hostname and > port of the JobManager can change (HA). Also, it is not unique depending on > the network configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3675) YARN ship folder incosistent behavior
[ https://issues.apache.org/jira/browse/FLINK-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359399#comment-15359399 ] Maximilian Michels commented on FLINK-3675: --- Additional fix with 16cdb61225d78c822566e33013162fa3e40fa279 > YARN ship folder incosistent behavior > - > > Key: FLINK-3675 > URL: https://issues.apache.org/jira/browse/FLINK-3675 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Maximilian Michels >Priority: Critical > Fix For: 1.1.0 > > > After [some discussion on the user mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-YARN-ship-folder-td5458.html] > it came up that the {{flink/lib}} folder is always supposed to be shipped to > the YARN cluster so that all the nodes have access to its contents. > Currently however, the Flink long-running YARN session actually ships the > folder because it's explicitly specified in the {{yarn-session.sh}} script, > while running a single job on YARN does not automatically ship it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3630) Little mistake in documentation
[ https://issues.apache.org/jira/browse/FLINK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359318#comment-15359318 ] Greg Hogan commented on FLINK-3630: --- Hi [~riccardo_91], I see you were working on this issue. Is there anything we can help with? > Little mistake in documentation > --- > > Key: FLINK-3630 > URL: https://issues.apache.org/jira/browse/FLINK-3630 > Project: Flink > Issue Type: Bug > Components: DataSet API, Documentation >Affects Versions: 1.0.0 >Reporter: Riccardo Diomedi >Assignee: Riccardo Diomedi >Priority: Minor > Labels: documentation > > in section "GroupCombine on a Grouped DataSet" of the following link: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#groupreduce-on-grouped-dataset > there is a little mistake in java code in both combine and reduce method(it's > the same mistake). The variable "word" is defined in the scope of the for > loop so it cannot be used in collect method. > Possible solution could be to initialise the variable before the for and > assign a value inside the for. > Something like: > int count = 0; > String word; > for (String record : words) { > word = record; > count++; > } > out.collect(new Tuple2(word, count)); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4144) Yarn properties file: replace hostname/port with Yarn application id
[ https://issues.apache.org/jira/browse/FLINK-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359262#comment-15359262 ] ASF GitHub Bot commented on FLINK-4144: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2191 [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4144 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2191 commit 4ee5cc1fc2b00f0247ed1f3ac91e9d89d5b080a9 Author: Maximilian MichelsDate: 2016-07-01T16:54:44Z [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id > Yarn properties file: replace hostname/port with Yarn application id > > > Key: FLINK-4144 > URL: https://issues.apache.org/jira/browse/FLINK-4144 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > We should use the application id instead of the host/port. The hostname and > port of the JobManager can change (HA). Also, it is not unique depending on > the network configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2191: [FLINK-4144] Yarn properties file: replace hostnam...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2191 [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4144 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2191 commit 4ee5cc1fc2b00f0247ed1f3ac91e9d89d5b080a9 Author: Maximilian MichelsDate: 2016-07-01T16:54:44Z [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3
[ https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3801: -- Description: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 was: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 > Upgrade Joda-Time library to 2.9.3 > -- > > Key: FLINK-3801 > URL: https://issues.apache.org/jira/browse/FLINK-3801 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently yoda-time 2.5 is used which was very old. > We should upgrade to 2.9.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3753) KillerWatchDog should not use kill on toKill thread
[ https://issues.apache.org/jira/browse/FLINK-3753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3753: -- Description: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads was: {code} // this is harsh, but this watchdog is a last resort if (toKill.isAlive()) { toKill.stop(); } {code} stop() is deprecated. See: https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads > KillerWatchDog should not use kill on toKill thread > --- > > Key: FLINK-3753 > URL: https://issues.apache.org/jira/browse/FLINK-3753 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > // this is harsh, but this watchdog is a last resort > if (toKill.isAlive()) { > toKill.stop(); > } > {code} > stop() is deprecated. > See: > https://www.securecoding.cert.org/confluence/display/java/THI05-J.+Do+not+use+Thread.stop()+to+terminate+threads -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359165#comment-15359165 ] ASF GitHub Bot commented on FLINK-3231: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2131 Thank you for the quick fix. I hope I can take a look tonight. Otherwise, I'll look at it early next week. Thanks a lot for addressing my comments to quickly. > Handle Kinesis-side resharding in Kinesis streaming consumer > > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4141) TaskManager failures not always recover when killed during an ApplicationMaster failure in HA mode on Yarn
[ https://issues.apache.org/jira/browse/FLINK-4141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-4141: - Assignee: Maximilian Michels > TaskManager failures not always recover when killed during an > ApplicationMaster failure in HA mode on Yarn > -- > > Key: FLINK-4141 > URL: https://issues.apache.org/jira/browse/FLINK-4141 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Maximilian Michels > > High availability on Yarn often fails to recover in the following test > scenario: > 1. Kill application master process. > 2. Then, while application master is recovering, randomly kill several task > managers (with some delay). > After the application master recovered, not all the killed task manager are > brought back and no further attempts are made the restart them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359105#comment-15359105 ] ASF GitHub Bot commented on FLINK-3231: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2131 @rmetzger Thank you for your review. I hope I've addressed your last comments with the last commit. For the documentation, I added a bit more apart from the threading model: 1) enabling checkpointing, mostly borrowed from the Kafka documentation, and 2) information on how the consumer internally uses the Kinesis APIs so that users can make sense of any limit warnings they are getting in the logs. Please let me know if there's anything else to address! > Handle Kinesis-side resharding in Kinesis streaming consumer > > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2131: [FLINK-3231][streaming-connectors] FlinkKinesisConsumer r...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2131 @rmetzger Thank you for your review. I hope I've addressed your last comments with the last commit. For the documentation, I added a bit more apart from the threading model: 1) enabling checkpointing, mostly borrowed from the Kafka documentation, and 2) information on how the consumer internally uses the Kinesis APIs so that users can make sense of any limit warnings they are getting in the logs. Please let me know if there's anything else to address! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4141) TaskManager failures not always recover when killed during an ApplicationMaster failure in HA mode on Yarn
[ https://issues.apache.org/jira/browse/FLINK-4141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359090#comment-15359090 ] ASF GitHub Bot commented on FLINK-4141: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2190 [FLINK-4141] remove leaderUpdated() method from ResourceManager This removes the leaderUpdated method from the framework. Further it lets the RM client thread communicate directly with the ResourceManager actor. This is fine since the two are always spawned together. Failures of the ResourceManager actor will lead to dropped messages of the RM client thread. Failures of the RM client thread will inform the JobManager. The leaderUpdated() method was used to signal the ResourceManager framework that a new leader was elected. However, the method was not always called when the leader changed, only when a new leader was elected. This dropped all messages from the async Yarn RM client thread (YarnResourceManagerCallbackHandler) for the time that the old leader had failed and no new leader had been elected. The Yarn RM client thread used leader tagged messages to communicate with the main Flink ResourceManager actor. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4141 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2190 commit c758121b9e5e2d7de8318bd529aa5da88ed424c6 Author: Maximilian MichelsDate: 2016-07-01T14:27:18Z [FLINK-4141] remove leaderUpdated() method from ResourceManager This removes the leaderUpdated method from the framework. Further it lets the RM client thread communicate directly with the ResourceManager actor. This is fine since the two are always spawned together. Failures of the ResourceManager actor will lead to dropped messages of the RM client thread. Failures of the RM client thread will inform the JobManager. The leaderUpdated() method was used to signal the ResourceManager framework that a new leader was elected. However, the method was not always called when the leader changed, only when a new leader was elected. This dropped all messages from the async Yarn RM client thread (YarnResourceManagerCallbackHandler) for the time that the old leader had failed and no new leader had been elected. The Yarn RM client thread used leader tagged messages to communicate with the main Flink ResourceManager actor. > TaskManager failures not always recover when killed during an > ApplicationMaster failure in HA mode on Yarn > -- > > Key: FLINK-4141 > URL: https://issues.apache.org/jira/browse/FLINK-4141 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > High availability on Yarn often fails to recover in the following test > scenario: > 1. Kill application master process. > 2. Then, while application master is recovering, randomly kill several task > managers (with some delay). > After the application master recovered, not all the killed task manager are > brought back and no further attempts are made the restart them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2190: [FLINK-4141] remove leaderUpdated() method from Re...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2190 [FLINK-4141] remove leaderUpdated() method from ResourceManager This removes the leaderUpdated method from the framework. Further it lets the RM client thread communicate directly with the ResourceManager actor. This is fine since the two are always spawned together. Failures of the ResourceManager actor will lead to dropped messages of the RM client thread. Failures of the RM client thread will inform the JobManager. The leaderUpdated() method was used to signal the ResourceManager framework that a new leader was elected. However, the method was not always called when the leader changed, only when a new leader was elected. This dropped all messages from the async Yarn RM client thread (YarnResourceManagerCallbackHandler) for the time that the old leader had failed and no new leader had been elected. The Yarn RM client thread used leader tagged messages to communicate with the main Flink ResourceManager actor. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4141 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2190 commit c758121b9e5e2d7de8318bd529aa5da88ed424c6 Author: Maximilian MichelsDate: 2016-07-01T14:27:18Z [FLINK-4141] remove leaderUpdated() method from ResourceManager This removes the leaderUpdated method from the framework. Further it lets the RM client thread communicate directly with the ResourceManager actor. This is fine since the two are always spawned together. Failures of the ResourceManager actor will lead to dropped messages of the RM client thread. Failures of the RM client thread will inform the JobManager. The leaderUpdated() method was used to signal the ResourceManager framework that a new leader was elected. However, the method was not always called when the leader changed, only when a new leader was elected. This dropped all messages from the async Yarn RM client thread (YarnResourceManagerCallbackHandler) for the time that the old leader had failed and no new leader had been elected. The Yarn RM client thread used leader tagged messages to communicate with the main Flink ResourceManager actor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4143) Configurable delimiter for metric identifier
Chesnay Schepler created FLINK-4143: --- Summary: Configurable delimiter for metric identifier Key: FLINK-4143 URL: https://issues.apache.org/jira/browse/FLINK-4143 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.1.0 Reporter: Chesnay Schepler Priority: Minor The metric identifier is currently hard-coded to separate components with a dot. We should make this configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4143) Configurable delimiter for metric identifier
[ https://issues.apache.org/jira/browse/FLINK-4143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-4143: Issue Type: Improvement (was: Bug) > Configurable delimiter for metric identifier > > > Key: FLINK-4143 > URL: https://issues.apache.org/jira/browse/FLINK-4143 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Priority: Minor > > The metric identifier is currently hard-coded to separate components with a > dot. > We should make this configurable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359079#comment-15359079 ] ASF GitHub Bot commented on FLINK-4118: --- Github user iemejia commented on the issue: https://github.com/apache/flink/pull/2176 Great, thanks for your review. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...
Github user iemejia commented on the issue: https://github.com/apache/flink/pull/2176 Great, thanks for your review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359073#comment-15359073 ] Stefan Richter commented on FLINK-4142: --- I have a log for the problem here: https://storage.googleapis.com/srichter/task_mgr_restart_endless.log > Recovery problem in HA on Hadoop Yarn 2.4.1 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: > 1) Kill application master, let it recover normally. > 2) After that, kill a task manager. > Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359060#comment-15359060 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 That's great to hear! I'll write something on the Beam ML thread. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 That's great to hear! I'll write something on the Beam ML thread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359054#comment-15359054 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 I'll have to try it out to be sure, but i can't a problem looking through the code. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 I'll have to try it out to be sure, but i can't a problem looking through the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.5.0
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4142: -- Description: On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: 1) Kill application master, let it recover normally. 2) After that, kill a task manager. Now, Yarn tries to restart the killed task manager in an endless loop. was: On Hadoop Yarn 2.5.0, recovery in HA fails in the following scenario: 1) Kill application master, let it recover normally. 2) After that, kill a task manager. Now, Yarn tries to restart the killed task manager in an endless loop. > Recovery problem in HA on Hadoop Yarn 2.5.0 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: > 1) Kill application master, let it recover normally. > 2) After that, kill a task manager. > Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.4.1
[ https://issues.apache.org/jira/browse/FLINK-4142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4142: -- Summary: Recovery problem in HA on Hadoop Yarn 2.4.1 (was: Recovery problem in HA on Hadoop Yarn 2.5.0) > Recovery problem in HA on Hadoop Yarn 2.4.1 > --- > > Key: FLINK-4142 > URL: https://issues.apache.org/jira/browse/FLINK-4142 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.3 >Reporter: Stefan Richter > > On Hadoop Yarn 2.4.1, recovery in HA fails in the following scenario: > 1) Kill application master, let it recover normally. > 2) After that, kill a task manager. > Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...
Github user iemejia commented on the issue: https://github.com/apache/flink/pull/2176 This should be ok now. In further commits I expect to fix the daemon thing + maybe add a HA version using zookeeper of the docker-compose file. One more question aljoscha, I intend to add the Beam Flink runner and contribute a similar version into Beam, however I don't know what is the best approach for this, I just tried naively to put the jars in $FLINK_HOME/lib but it didn't work, any ideas ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359047#comment-15359047 ] ASF GitHub Bot commented on FLINK-4118: --- Github user iemejia commented on the issue: https://github.com/apache/flink/pull/2176 This should be ok now. In further commits I expect to fix the daemon thing + maybe add a HA version using zookeeper of the docker-compose file. One more question aljoscha, I intend to add the Beam Flink runner and contribute a similar version into Beam, however I don't know what is the best approach for this, I just tried naively to put the jars in $FLINK_HOME/lib but it didn't work, any ideas ? > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359045#comment-15359045 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 Please write a comment when you update the PR, we don't get any notifications for pushed commits :) > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 Please write a comment when you update the PR, we don't get any notifications for pushed commits :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4142) Recovery problem in HA on Hadoop Yarn 2.5.0
Stefan Richter created FLINK-4142: - Summary: Recovery problem in HA on Hadoop Yarn 2.5.0 Key: FLINK-4142 URL: https://issues.apache.org/jira/browse/FLINK-4142 Project: Flink Issue Type: Bug Components: YARN Client Affects Versions: 1.0.3 Reporter: Stefan Richter On Hadoop Yarn 2.5.0, recovery in HA fails in the following scenario: 1) Kill application master, let it recover normally. 2) After that, kill a task manager. Now, Yarn tries to restart the killed task manager in an endless loop. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2176: [FLINK-4118] The docker-flink image is outdated (1...
Github user iemejia commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69306327 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do +docker cp $i:/ +done - Run a topology -`ssh -p 220 root@localhost /usr/local/flink/bin/flink run -c ` +docker run -it --rm flink:latest flink run -m -c --- End diff -- Yes, you are right, I hesitated about this, if you noticed, I added a third option apart of jobmanager/taskmanager to let the image open ended, my goal was in part that people could start an container with flink as a client, or for any other use. However this can be confusing (because people mostly expect to send ther local jar, but I will change it as you suggest in the README, and advanced users can do as they wish. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359036#comment-15359036 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69307221 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do +docker cp $i:/ +done - Run a topology -`ssh -p 220 root@localhost /usr/local/flink/bin/flink run -c ` +docker run -it --rm flink:latest flink run -m -c --- End diff -- I see. I can only speak from experience: I uploaded my jar to the JobManager container, then tried to run and it said that the jar was not available. I then found out that the run creates this new container that doesn't actually contain the jar. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2176: [FLINK-4118] The docker-flink image is outdated (1...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69307221 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do +docker cp $i:/ +done - Run a topology -`ssh -p 220 root@localhost /usr/local/flink/bin/flink run -c ` +docker run -it --rm flink:latest flink run -m -c --- End diff -- I see. I can only speak from experience: I uploaded my jar to the JobManager container, then tried to run and it said that the jar was not available. I then found out that the run creates this new container that doesn't actually contain the jar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359034#comment-15359034 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69307034 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do --- End diff -- Yep, the TaskManagers pull it from the JobManager which keeps it in a component called BlobManager. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2176: [FLINK-4118] The docker-flink image is outdated (1...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69307034 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do --- End diff -- Yep, the TaskManagers pull it from the JobManager which keeps it in a component called BlobManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359029#comment-15359029 ] ASF GitHub Bot commented on FLINK-4118: --- Github user iemejia commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69306327 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do +docker cp $i:/ +done - Run a topology -`ssh -p 220 root@localhost /usr/local/flink/bin/flink run -c ` +docker run -it --rm flink:latest flink run -m -c --- End diff -- Yes, you are right, I hesitated about this, if you noticed, I added a third option apart of jobmanager/taskmanager to let the image open ended, my goal was in part that people could start an container with flink as a client, or for any other use. However this can be confusing (because people mostly expect to send ther local jar, but I will change it as you suggest in the README, and advanced users can do as they wish. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359031#comment-15359031 ] ASF GitHub Bot commented on FLINK-4118: --- Github user iemejia commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69306386 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do --- End diff -- Nice I didn't know that flink took care of this, fix in mins. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2176: [FLINK-4118] The docker-flink image is outdated (1...
Github user iemejia commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69306386 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do --- End diff -- Nice I didn't know that flink took care of this, fix in mins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359012#comment-15359012 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69304962 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do +docker cp $i:/ +done - Run a topology -`ssh -p 220 root@localhost /usr/local/flink/bin/flink run -c ` +docker run -it --rm flink:latest flink run -m -c --- End diff -- AFAIK `docker run` starts a new container to run the command. This should probably be something like: ``` docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) flink run -m -c ``` This would run the command on the existing JobManager container where the jar was previously uploaded. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359013#comment-15359013 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 I had two more comments about the README but after that it should be good to merge. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 I had two more comments about the README but after that it should be good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2176: [FLINK-4118] The docker-flink image is outdated (1...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69304962 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do +docker cp $i:/ +done - Run a topology -`ssh -p 220 root@localhost /usr/local/flink/bin/flink run -c ` +docker run -it --rm flink:latest flink run -m -c --- End diff -- AFAIK `docker run` starts a new container to run the command. This should probably be something like: ``` docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) flink run -m -c ``` This would run the command on the existing JobManager container where the jar was previously uploaded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359010#comment-15359010 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69304467 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do --- End diff -- The jar only needs to be uploaded to the JobManager container, so something like this should suffice: ``` docker cp $(docker ps --filter name=flink_jobmanager --format={{.ID}}):/ ``` > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2176: [FLINK-4118] The docker-flink image is outdated (1...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2176#discussion_r69304467 --- Diff: flink-contrib/docker-flink/README.md --- @@ -1,80 +1,75 @@ -#Apache Flink cluster deployment on Docker using Docker-Compose +Apache Flink cluster deployment on docker using docker-compose -##Installation -###Install Docker +# Installation +Install the most recent stable version of docker https://docs.docker.com/installation/ -if you have issues with Docker-Compose versions incompatible with your version of Docker try +Install the most recent stable version of docker-compose +https://docs.docker.com/compose/install/ -`curl -sSL https://get.docker.com/ubuntu/ | sudo sh` +# Build -###Install Docker-Compose +Images are based on the official Java Alpine (OpenJDK 8) image and run +supervisord to stay alive when running containers. If you want to build the +flink image run: -``` -curl -L https://github.com/docker/compose/releases/download/1.1.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose +sh build.sh -chmod +x /usr/local/bin/docker-compose -``` - -###Get the repo - -###Build the images +or -Images are based on Ubuntu Trusty 14.04 and run Supervisord to stay alive when running containers. +docker build -t flink . -The base image installs Oracle Java JDK 1.7 and SSH client & server. You can change the SSH password there or add your own key and adjust SSH config. +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: -- Run `./build.sh` +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink -###Deploy +# Deploy - Deploy cluster and see config/setup log output (best run in a screen session) -`docker-compose up` +docker-compose up - Deploy as a daemon (and return) -`docker-compose up -d` +docker-compose up -d - Scale the cluster up or down to *N* TaskManagers -`docker-compose scale taskmanager=` - -- Access the JobManager node with SSH (exposed on Port 220) +docker-compose scale taskmanager= -`ssh root@localhost -p 220` +- Access the Job Manager container -or on Mac OS X with boot2docker - -`ssh root@$(boot2docker ip) -p 220` - -The password is 'secret' +docker exec -it $(docker ps --filter name=flink_jobmanager --format={{.ID}}) /bin/sh - Kill the cluster -`docker-compose kill` +docker-compose kill - Upload a jar to the cluster -`scp -P 220 root@localhost:/` +for i in $(docker ps --filter name=flink --format={{.ID}}); do --- End diff -- The jar only needs to be uploaded to the JobManager container, so something like this should suffice: ``` docker cp $(docker ps --filter name=flink_jobmanager --format={{.ID}}):/ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359009#comment-15359009 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 You were right, I did exactly the same thing I did on OS X on a new Ubuntu 16.04 installation and it worked. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 You were right, I did exactly the same thing I did on OS X on a new Ubuntu 16.04 installation and it worked. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4141) TaskManager failures not always recover when killed during an ApplicationMaster failure in HA mode on Yarn
Stefan Richter created FLINK-4141: - Summary: TaskManager failures not always recover when killed during an ApplicationMaster failure in HA mode on Yarn Key: FLINK-4141 URL: https://issues.apache.org/jira/browse/FLINK-4141 Project: Flink Issue Type: Bug Affects Versions: 1.0.3 Reporter: Stefan Richter High availability on Yarn often fails to recover in the following test scenario: 1. Kill application master process. 2. Then, while application master is recovering, randomly kill several task managers (with some delay). After the application master recovered, not all the killed task manager are brought back and no further attempts are made the restart them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358982#comment-15358982 ] ASF GitHub Bot commented on FLINK-3231: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69300764 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +
[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes
[ https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358981#comment-15358981 ] ASF GitHub Bot commented on FLINK-4127: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2177#discussion_r69300607 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -269,26 +269,26 @@ * Percentage of heap space to remove from containers (YARN / Mesos), to compensate * for other JVM memory usage. */ - public static final String CONTAINERED_HEAP_CUTOFF_RATIO = "containered.heap-cutoff-ratio"; + public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio"; /** * Minimum amount of heap memory to remove in containers, as a safety margin. */ - public static final String CONTAINERED_HEAP_CUTOFF_MIN = "containered.heap-cutoff-min"; + public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min"; /** * Prefix for passing custom environment variables to Flink's master process. * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. */ - public static final String CONTAINERED_MASTER_ENV_PREFIX = "containered.application-master.env."; + public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; --- End diff -- I thought this should be `CONTAINER_MASTER_ENV_PREFIX = container.master.env.`? > Clean up configuration and check breaking API changes > - > > Key: FLINK-4127 > URL: https://issues.apache.org/jira/browse/FLINK-4127 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.1.0 > > Attachments: flink-core.html, flink-java.html, flink-scala.html, > flink-streaming-java.html, flink-streaming-scala.html > > > For the upcoming 1.1. release, I'll check if there are any breaking API > changes and if the documentation is up tp date with the configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69300764 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +* clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. +*/ + private final KinesisDeserializationSchema deserializationSchema; + + //
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358979#comment-15358979 ] ASF GitHub Bot commented on FLINK-3231: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69300586 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +
[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2177#discussion_r69300607 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -269,26 +269,26 @@ * Percentage of heap space to remove from containers (YARN / Mesos), to compensate * for other JVM memory usage. */ - public static final String CONTAINERED_HEAP_CUTOFF_RATIO = "containered.heap-cutoff-ratio"; + public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio"; /** * Minimum amount of heap memory to remove in containers, as a safety margin. */ - public static final String CONTAINERED_HEAP_CUTOFF_MIN = "containered.heap-cutoff-min"; + public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min"; /** * Prefix for passing custom environment variables to Flink's master process. * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. */ - public static final String CONTAINERED_MASTER_ENV_PREFIX = "containered.application-master.env."; + public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; --- End diff -- I thought this should be `CONTAINER_MASTER_ENV_PREFIX = container.master.env.`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69300586 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +* clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. +*/ + private final KinesisDeserializationSchema deserializationSchema; + + //
[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69300348 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +* clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. +*/ + private final KinesisDeserializationSchema deserializationSchema; + + //
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358977#comment-15358977 ] ASF GitHub Bot commented on FLINK-3231: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69300348 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +
[jira] [Assigned] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint
[ https://issues.apache.org/jira/browse/FLINK-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter reassigned FLINK-4140: - Assignee: Stefan Richter > CheckpointCoordinator fails to discard completed checkpoint > --- > > Key: FLINK-4140 > URL: https://issues.apache.org/jira/browse/FLINK-4140 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Running a job in HA mode I saw the following warning in the job manager logs. > The warning appeared after the job was restarted due to a master failure. > I've skimmed the code and it looks like the user code class loader is used > everywhere when discarding the checkpoint, but something seems to not work as > expected (otherwise the warning should not appear). > {code} > 2016-07-01 13:08:33,218 WARN > org.apache.flink.runtime.checkpoint.SubtaskState - Failed to > discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: > SerializedValue) > java.lang.ClassNotFoundException: da.testing.State > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:278) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) > at > org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at java.util.HashMap.readObject(HashMap.java:1180) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at
[jira] [Updated] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint
[ https://issues.apache.org/jira/browse/FLINK-4140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4140: -- Assignee: Ufuk Celebi (was: Stefan Richter) > CheckpointCoordinator fails to discard completed checkpoint > --- > > Key: FLINK-4140 > URL: https://issues.apache.org/jira/browse/FLINK-4140 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Assignee: Ufuk Celebi > > Running a job in HA mode I saw the following warning in the job manager logs. > The warning appeared after the job was restarted due to a master failure. > I've skimmed the code and it looks like the user code class loader is used > everywhere when discarding the checkpoint, but something seems to not work as > expected (otherwise the warning should not appear). > {code} > 2016-07-01 13:08:33,218 WARN > org.apache.flink.runtime.checkpoint.SubtaskState - Failed to > discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: > SerializedValue) > java.lang.ClassNotFoundException: da.testing.State > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:278) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) > at > org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at java.util.HashMap.readObject(HashMap.java:1180) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) > at
[jira] [Created] (FLINK-4140) CheckpointCoordinator fails to discard completed checkpoint
Stefan Richter created FLINK-4140: - Summary: CheckpointCoordinator fails to discard completed checkpoint Key: FLINK-4140 URL: https://issues.apache.org/jira/browse/FLINK-4140 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.0.3 Reporter: Stefan Richter Running a job in HA mode I saw the following warning in the job manager logs. The warning appeared after the job was restarted due to a master failure. I've skimmed the code and it looks like the user code class loader is used everywhere when discarding the checkpoint, but something seems to not work as expected (otherwise the warning should not appear). {code} 2016-07-01 13:08:33,218 WARN org.apache.flink.runtime.checkpoint.SubtaskState - Failed to discard checkpoint state: StateForTask(Size: 4, Duration: 2012, State: SerializedValue) java.lang.ClassNotFoundException: da.testing.State at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1907) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2016) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1940) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1806)
[GitHub] flink issue #2189: [FLINK-3667] delay connection to JobManager until job exe...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2189 Sorry, didn't see your comments. The test failures should be resolved (I ran `mvn verify`). The changes delay the creation of the ActorSystem and thus any blocking actions until we have executed the user jar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3667) Generalize client<->cluster communication
[ https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358971#comment-15358971 ] ASF GitHub Bot commented on FLINK-3667: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2189 Sorry, didn't see your comments. The test failures should be resolved (I ran `mvn verify`). The changes delay the creation of the ActorSystem and thus any blocking actions until we have executed the user jar. > Generalize client<->cluster communication > - > > Key: FLINK-3667 > URL: https://issues.apache.org/jira/browse/FLINK-3667 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > Here are some notes I took when inspecting the client<->cluster classes with > regard to future integration of other resource management frameworks in > addition to Yarn (e.g. Mesos). > {noformat} > 1 Cluster Client Abstraction > > 1.1 Status Quo > ── > 1.1.1 FlinkYarnClient > ╌ > • Holds the cluster configuration (Flink-specific and Yarn-specific) > • Contains the deploy() method to deploy the cluster > • Creates the Hadoop Yarn client > • Receives the initial job manager address > • Bootstraps the FlinkYarnCluster > 1.1.2 FlinkYarnCluster > ╌╌ > • Wrapper around the Hadoop Yarn client > • Queries cluster for status updates > • Life time methods to start and shutdown the cluster > • Flink specific features like shutdown after job completion > 1.1.3 ApplicationClient > ╌╌╌ > • Acts as a middle-man for asynchronous cluster communication > • Designed to communicate with Yarn, not used in Standalone mode > 1.1.4 CliFrontend > ╌ > • Deeply integrated with FlinkYarnClient and FlinkYarnCluster > • Constantly distinguishes between Yarn and Standalone mode > • Would be nice to have a general abstraction in place > 1.1.5 Client > > • Job submission and Job related actions, agnostic of resource framework > 1.2 Proposal > > 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient) > ╌ > • Extensible cluster-agnostic config > • May be extended by specific cluster, e.g. YarnClusterConfig > 1.2.2 ClusterClient (before: AbstractFlinkYarnClient) > ╌ > • Deals with cluster (RM) specific communication > • Exposes framework agnostic information > • YarnClusterClient, MesosClusterClient, StandaloneClusterClient > 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster) > ╌ > • Basic interface to communicate with a running cluster > • Receives the ClusterClient for cluster-specific communication > • Should not have to care about the specific implementations of the > client > 1.2.4 ApplicationClient > ╌╌╌ > • Can be changed to work cluster-agnostic (first steps already in > FLINK-3543) > 1.2.5 CliFrontend > ╌ > • CliFrontend does never have to differentiate between different > cluster types after it has determined which cluster class to load. > • Base class handles framework agnostic command line arguments > • Pluggables for Yarn, Mesos handle specific commands > {noformat} > I would like to create/refactor the affected classes to set us up for a more > flexible client side resource management abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358969#comment-15358969 ] ASF GitHub Bot commented on FLINK-3231: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69299251 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +
[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69299251 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +* clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. +*/ + private final KinesisDeserializationSchema deserializationSchema; + + //
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358968#comment-15358968 ] ASF GitHub Bot commented on FLINK-3231: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69299145 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +
[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69299145 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +* clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. +*/ + private final KinesisDeserializationSchema deserializationSchema; + + //
[jira] [Resolved] (FLINK-3675) YARN ship folder incosistent behavior
[ https://issues.apache.org/jira/browse/FLINK-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-3675. --- Resolution: Fixed Fixed via 0483ba583c7790d13b8035c2916318a2b58c67d6 > YARN ship folder incosistent behavior > - > > Key: FLINK-3675 > URL: https://issues.apache.org/jira/browse/FLINK-3675 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Maximilian Michels >Priority: Critical > Fix For: 1.1.0 > > > After [some discussion on the user mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-YARN-ship-folder-td5458.html] > it came up that the {{flink/lib}} folder is always supposed to be shipped to > the YARN cluster so that all the nodes have access to its contents. > Currently however, the Flink long-running YARN session actually ships the > folder because it's explicitly specified in the {{yarn-session.sh}} script, > while running a single job on YARN does not automatically ship it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69298876 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -60,10 +60,10 @@ to setup Kinesis streams. Make sure to create the appropriate IAM policy and use ### Kinesis Consumer --- End diff -- Good point! I'll add this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358962#comment-15358962 ] ASF GitHub Bot commented on FLINK-3231: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69298876 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -60,10 +60,10 @@ to setup Kinesis streams. Make sure to create the appropriate IAM policy and use ### Kinesis Consumer --- End diff -- Good point! I'll add this. > Handle Kinesis-side resharding in Kinesis streaming consumer > > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3667) Generalize client<->cluster communication
[ https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358960#comment-15358960 ] Maximilian Michels commented on FLINK-3667: --- Fixed via f9b52a3114a2114e6846091acf3abb294a49615b Additional fixes: 3b593632dd162d951281fab8a8ed8c6bc2b07b39 a3aea27983d23d48bbad92c400d4cd42f36fabd3 cfd48a6f510c937080df0918fcb05aa410885c29 8d589623d2c2d039b014bc8783bef25351ec36ce > Generalize client<->cluster communication > - > > Key: FLINK-3667 > URL: https://issues.apache.org/jira/browse/FLINK-3667 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > Here are some notes I took when inspecting the client<->cluster classes with > regard to future integration of other resource management frameworks in > addition to Yarn (e.g. Mesos). > {noformat} > 1 Cluster Client Abstraction > > 1.1 Status Quo > ── > 1.1.1 FlinkYarnClient > ╌ > • Holds the cluster configuration (Flink-specific and Yarn-specific) > • Contains the deploy() method to deploy the cluster > • Creates the Hadoop Yarn client > • Receives the initial job manager address > • Bootstraps the FlinkYarnCluster > 1.1.2 FlinkYarnCluster > ╌╌ > • Wrapper around the Hadoop Yarn client > • Queries cluster for status updates > • Life time methods to start and shutdown the cluster > • Flink specific features like shutdown after job completion > 1.1.3 ApplicationClient > ╌╌╌ > • Acts as a middle-man for asynchronous cluster communication > • Designed to communicate with Yarn, not used in Standalone mode > 1.1.4 CliFrontend > ╌ > • Deeply integrated with FlinkYarnClient and FlinkYarnCluster > • Constantly distinguishes between Yarn and Standalone mode > • Would be nice to have a general abstraction in place > 1.1.5 Client > > • Job submission and Job related actions, agnostic of resource framework > 1.2 Proposal > > 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient) > ╌ > • Extensible cluster-agnostic config > • May be extended by specific cluster, e.g. YarnClusterConfig > 1.2.2 ClusterClient (before: AbstractFlinkYarnClient) > ╌ > • Deals with cluster (RM) specific communication > • Exposes framework agnostic information > • YarnClusterClient, MesosClusterClient, StandaloneClusterClient > 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster) > ╌ > • Basic interface to communicate with a running cluster > • Receives the ClusterClient for cluster-specific communication > • Should not have to care about the specific implementations of the > client > 1.2.4 ApplicationClient > ╌╌╌ > • Can be changed to work cluster-agnostic (first steps already in > FLINK-3543) > 1.2.5 CliFrontend > ╌ > • CliFrontend does never have to differentiate between different > cluster types after it has determined which cluster class to load. > • Base class handles framework agnostic command line arguments > • Pluggables for Yarn, Mesos handle specific commands > {noformat} > I would like to create/refactor the affected classes to set us up for a more > flexible client side resource management abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3964) Job submission times out with recursive.file.enumeration
[ https://issues.apache.org/jira/browse/FLINK-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358953#comment-15358953 ] ASF GitHub Bot commented on FLINK-3964: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2168 > Job submission times out with recursive.file.enumeration > > > Key: FLINK-3964 > URL: https://issues.apache.org/jira/browse/FLINK-3964 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, DataSet API >Affects Versions: 1.0.0 >Reporter: Juho Autio > > When using "recursive.file.enumeration" with a big enough folder structure to > list, flink batch job fails right at the beginning because of a timeout. > h2. Problem details > We get this error: {{Communication with JobManager failed: Job submission to > the JobManager timed out}}. > The code we have is basically this: > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val parameters = new Configuration > // set the recursive enumeration parameter > parameters.setBoolean("recursive.file.enumeration", true) > val parameter = ParameterTool.fromArgs(args) > val input_data_path : String = parameter.get("input_data_path", null ) > val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], > classOf[Text], input_data_path) > .withParameters(parameters) > data.first(10).print > {code} > If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it > times out. If we use a more restrictive pattern like > {{s3n://bucket/path/date=20160523/}}, it doesn't time out. > To me it seems that time taken to list files shouldn't cause any timeouts on > job submission level. > For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in > {{flink-conf.yaml}}, but I wonder if the timeout would still occur if we have > even more files to list? > > P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink > run}} instead of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} > flag but couldn't get it working. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2168: [FLINK-3964] add hint to job submission timeout ex...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2168 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3675) YARN ship folder incosistent behavior
[ https://issues.apache.org/jira/browse/FLINK-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358952#comment-15358952 ] ASF GitHub Bot commented on FLINK-3675: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2187 > YARN ship folder incosistent behavior > - > > Key: FLINK-3675 > URL: https://issues.apache.org/jira/browse/FLINK-3675 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Maximilian Michels >Priority: Critical > Fix For: 1.1.0 > > > After [some discussion on the user mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-YARN-ship-folder-td5458.html] > it came up that the {{flink/lib}} folder is always supposed to be shipped to > the YARN cluster so that all the nodes have access to its contents. > Currently however, the Flink long-running YARN session actually ships the > folder because it's explicitly specified in the {{yarn-session.sh}} script, > while running a single job on YARN does not automatically ship it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2187: [FLINK-3675][yarn] improvements to library shippin...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2187 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2189: [FLINK-3667] delay connection to JobManager until ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2189 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4118) The docker-flink image is outdated (1.0.2) and can be slimmed down
[ https://issues.apache.org/jira/browse/FLINK-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358949#comment-15358949 ] ASF GitHub Bot commented on FLINK-4118: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 Yes, this is exactly what I was trying on OS X. I'm quickly setting up a ubuntu VM to see if it works there. > The docker-flink image is outdated (1.0.2) and can be slimmed down > -- > > Key: FLINK-4118 > URL: https://issues.apache.org/jira/browse/FLINK-4118 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > This issue is to upgrade the docker image and polish some details in it (e.g. > it can be slimmed down if we remove some unneeded dependencies, and the code > can be polished). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3667) Generalize client<->cluster communication
[ https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358951#comment-15358951 ] ASF GitHub Bot commented on FLINK-3667: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2189 > Generalize client<->cluster communication > - > > Key: FLINK-3667 > URL: https://issues.apache.org/jira/browse/FLINK-3667 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > Here are some notes I took when inspecting the client<->cluster classes with > regard to future integration of other resource management frameworks in > addition to Yarn (e.g. Mesos). > {noformat} > 1 Cluster Client Abstraction > > 1.1 Status Quo > ── > 1.1.1 FlinkYarnClient > ╌ > • Holds the cluster configuration (Flink-specific and Yarn-specific) > • Contains the deploy() method to deploy the cluster > • Creates the Hadoop Yarn client > • Receives the initial job manager address > • Bootstraps the FlinkYarnCluster > 1.1.2 FlinkYarnCluster > ╌╌ > • Wrapper around the Hadoop Yarn client > • Queries cluster for status updates > • Life time methods to start and shutdown the cluster > • Flink specific features like shutdown after job completion > 1.1.3 ApplicationClient > ╌╌╌ > • Acts as a middle-man for asynchronous cluster communication > • Designed to communicate with Yarn, not used in Standalone mode > 1.1.4 CliFrontend > ╌ > • Deeply integrated with FlinkYarnClient and FlinkYarnCluster > • Constantly distinguishes between Yarn and Standalone mode > • Would be nice to have a general abstraction in place > 1.1.5 Client > > • Job submission and Job related actions, agnostic of resource framework > 1.2 Proposal > > 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient) > ╌ > • Extensible cluster-agnostic config > • May be extended by specific cluster, e.g. YarnClusterConfig > 1.2.2 ClusterClient (before: AbstractFlinkYarnClient) > ╌ > • Deals with cluster (RM) specific communication > • Exposes framework agnostic information > • YarnClusterClient, MesosClusterClient, StandaloneClusterClient > 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster) > ╌ > • Basic interface to communicate with a running cluster > • Receives the ClusterClient for cluster-specific communication > • Should not have to care about the specific implementations of the > client > 1.2.4 ApplicationClient > ╌╌╌ > • Can be changed to work cluster-agnostic (first steps already in > FLINK-3543) > 1.2.5 CliFrontend > ╌ > • CliFrontend does never have to differentiate between different > cluster types after it has determined which cluster class to load. > • Base class handles framework agnostic command line arguments > • Pluggables for Yarn, Mesos handle specific commands > {noformat} > I would like to create/refactor the affected classes to set us up for a more > flexible client side resource management abstraction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4139) Yarn: Adjust parallelism and task slots correctly
[ https://issues.apache.org/jira/browse/FLINK-4139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-4139. - Resolution: Fixed Fixed via 44b3bc45b382c1f2783e9c17dd76ea2e9bbb40ec > Yarn: Adjust parallelism and task slots correctly > - > > Key: FLINK-4139 > URL: https://issues.apache.org/jira/browse/FLINK-4139 > Project: Flink > Issue Type: Bug > Components: Client, YARN Client >Affects Versions: 1.1.0, 1.0.3 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.1.0 > > > The Yarn CLI should handle the following situations correctly: > - The user specifies no parallelism -> parallelism is adjusted to #taskSlots > * #nodes. > - The user specifies parallelism but no #taskSlots or too few slots -> > #taskSlots are set such that they meet the parallelism > These functionality has been present in Flink 1.0.x but there were some > glitches in the implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2176: [FLINK-4118] The docker-flink image is outdated (1.0.2) a...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2176 Yes, this is exactly what I was trying on OS X. I'm quickly setting up a ubuntu VM to see if it works there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2167: [FLINK-4122] Disable root shading in Cassandra jar
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/2167 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4122) Cassandra jar contains 2 guava versions
[ https://issues.apache.org/jira/browse/FLINK-4122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-4122. --- Resolution: Fixed Fixed in 96590ffaf650b80e277c3f35d3c63f33362f0cc6 > Cassandra jar contains 2 guava versions > --- > > Key: FLINK-4122 > URL: https://issues.apache.org/jira/browse/FLINK-4122 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4139) Yarn: Adjust parallelism and task slots correctly
Maximilian Michels created FLINK-4139: - Summary: Yarn: Adjust parallelism and task slots correctly Key: FLINK-4139 URL: https://issues.apache.org/jira/browse/FLINK-4139 Project: Flink Issue Type: Bug Components: Client, YARN Client Affects Versions: 1.0.3, 1.1.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Minor Fix For: 1.1.0 The Yarn CLI should handle the following situations correctly: - The user specifies no parallelism -> parallelism is adjusted to #taskSlots * #nodes. - The user specifies parallelism but no #taskSlots or too few slots -> #taskSlots are set such that they meet the parallelism These functionality has been present in Flink 1.0.x but there were some glitches in the implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4057) Expose JobManager Metrics
[ https://issues.apache.org/jira/browse/FLINK-4057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-4057. --- Resolution: Fixed Implemented in a3a9fd1147aa926987420057f8305ab498519a45, 8829f97344cd9a7a9cfdee4db3d55a2635ff1f31 and 9e540daf6e44c386ca82e6818f87d634be316e6c. > Expose JobManager Metrics > - > > Key: FLINK-4057 > URL: https://issues.apache.org/jira/browse/FLINK-4057 > Project: Flink > Issue Type: Sub-task > Components: JobManager, Metrics >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > > We should expose the following Metrics on the JobManager: > # of running Jobs/Tasks > # of registered TaskManagers > # of used/available TaskSlots > Checkpoint Size > Checkpoint Time -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2167: [FLINK-4122] Disable root shading in Cassandra jar
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2167 merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4122) Cassandra jar contains 2 guava versions
[ https://issues.apache.org/jira/browse/FLINK-4122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358934#comment-15358934 ] ASF GitHub Bot commented on FLINK-4122: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2167 merging > Cassandra jar contains 2 guava versions > --- > > Key: FLINK-4122 > URL: https://issues.apache.org/jira/browse/FLINK-4122 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager
[ https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358931#comment-15358931 ] ASF GitHub Bot commented on FLINK-1550: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2146 > Show JVM Metrics for JobManager > --- > > Key: FLINK-1550 > URL: https://issues.apache.org/jira/browse/FLINK-1550 > Project: Flink > Issue Type: Sub-task > Components: JobManager, Metrics >Reporter: Robert Metzger >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1550) Show JVM Metrics for JobManager
[ https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-1550. --- Resolution: Fixed Fix Version/s: (was: pre-apache) 1.1.0 Implemented in fafb981772bff57a153f02fd171d7d15f3a08379 > Show JVM Metrics for JobManager > --- > > Key: FLINK-1550 > URL: https://issues.apache.org/jira/browse/FLINK-1550 > Project: Flink > Issue Type: Sub-task > Components: JobManager, Metrics >Reporter: Robert Metzger >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2146 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358927#comment-15358927 ] ASF GitHub Bot commented on FLINK-3231: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2131 I tested the code, also with a shard-merging: `aws kinesis merge-shards --shard-to-merge shardId-0001 --adjacent-shard-to-merge shardId-0002 --stream-name flink-test` and everything worked nicely, the log statements were good. If you want to improve it a little bit, we should maybe log at debug level each time we discover new shards (just to show that everything is working as expected). The only thing missing are some minor documents, then, I think we are good to merge. > Handle Kinesis-side resharding in Kinesis streaming consumer > > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2131: [FLINK-3231][streaming-connectors] FlinkKinesisConsumer r...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2131 I tested the code, also with a shard-merging: `aws kinesis merge-shards --shard-to-merge shardId-0001 --adjacent-shard-to-merge shardId-0002 --stream-name flink-test` and everything worked nicely, the log statements were good. If you want to improve it a little bit, we should maybe log at debug level each time we discover new shards (just to show that everything is working as expected). The only thing missing are some minor documents, then, I think we are good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69296319 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +* clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. +*/ + private final KinesisDeserializationSchema deserializationSchema; + + //
[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer
[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358925#comment-15358925 ] ASF GitHub Bot commented on FLINK-3231: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69296319 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -17,157 +17,553 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Map; -import java.util.HashMap; + +import java.util.LinkedList; import java.util.List; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. - * The fetcher spawns a single thread for connection to each shard. + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * + * 1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring) + * 2. decide where in each discovered shard should the fetcher start subscribing to + * 3. subscribe to shards by creating a single thread for each shard + * + * + * The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. */ -public class KinesisDataFetcher { +public class KinesisDataFetcher { private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); - /** Config properties for the Flink Kinesis Consumer */ + // + // Consumer-wide settings + // + + /** Configuration properties for the Flink Kinesis Consumer */ private final Properties configProps; - /** The name of the consumer task that this fetcher was instantiated */ - private final String taskName; + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List streams; + + /** +* The deserialization schema we will be using to convert Kinesis records to Flink objects. +* Note that since this might not be thread-safe, {@link ShardConsumer}s using this must +