[GitHub] flink issue #2732: [FLINK-4272] Create a JobClient for job control and monit...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2732 This is still based on old runtime parts (JobManager), though the interface allows it to be ported to the new runtime (JobMaster). As the new one is about to supersede the old one, it might be sensible to port this to the new one first. ---
[GitHub] flink issue #2732: [FLINK-4272] Create a JobClient for job control and monit...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2732 This probably needs an overhaul by now. Have there been any efforts undergone to introduce a job client? ---
[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2917 >edit: Re-reading the PR description, I actually got it to work by setting jobmanager.rpc.address to the external IP and getting rid of the hostname. Glad you were able to solve the problem! :) Akka requires that all messages are tagged with the same address which was used during initialization of the receiver actor system. While this PR gets rid of the issue that an address might not be available for binding during initialization of the receiver, it still requires you to use a consistent address. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3086: Improve docker setup
Github user mxm commented on the issue: https://github.com/apache/flink/pull/3086 No, looks good. Thanks. As Greg mentioned, please open a JIRA issue next time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3086: Improve docker setup
Github user mxm commented on the issue: https://github.com/apache/flink/pull/3086 @kaelumania `ARG` is only available from Docker 1.10 on and upwards. I believe docker-compose also offers to configure environment variables either through the compose file or via a command-line argument. If we refrained from `ARG`, we could keep backwards-compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3086: Improve docker setup
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3086#discussion_r95411011 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -28,6 +28,9 @@ if [ "$1" == "jobmanager" ]; then echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml $FLINK_HOME/bin/jobmanager.sh start cluster + + # prevent script to exit + tail -f /dev/null --- End diff -- I think the proper way to fix this, would be to call a non-daemonized startup script. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3086: Improve docker setup
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3086#discussion_r95411497 --- Diff: flink-contrib/docker-flink/docker-entrypoint.sh --- @@ -36,9 +39,9 @@ elif [ "$1" == "taskmanager" ]; then echo "Starting Task Manager" echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml $FLINK_HOME/bin/taskmanager.sh start + + # prevent script to exit + tail -f /dev/null else $@ --- End diff -- @greghogan Seems like a way to execute an arbitrary command passed inside the Docker container passed as an argument to `docker run `. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3086: Improve docker setup
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3086#discussion_r95410892 --- Diff: flink-contrib/docker-flink/Dockerfile --- @@ -22,9 +22,9 @@ FROM java:8-jre-alpine RUN apk add --no-cache bash snappy # Configure Flink version -ENV FLINK_VERSION=1.1.1 -ENV HADOOP_VERSION=27 -ENV SCALA_VERSION=2.11 +ARG FLINK_VERSION=1.1.3 --- End diff -- `ARG` is only available in newer versions of Docker. If we want to maintain backwards-compatibility, we should adjust the README to state `docker build --env FLINK_VERSION=1.0.3`. As far as I know, we don't gain anything by using `ARG`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3017: [FLINK-5350] don't overwrite an existing JAAS config
Github user mxm commented on the issue: https://github.com/apache/flink/pull/3017 Thank you @theomega. Merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3016: [FLINK-5344] Fixed the dockerized doc build, which has be...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/3016 @greghogan `docs/build_docs.sh` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3016: [FLINK-5344] Fixed the dockerized doc build, which has be...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/3016 Had to add this to the root `pom.xml` to convince the Rat plugin to pass. ```xml docs/ruby2/Gemfile.lock ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3016: [FLINK-5344] Fixed the dockerized doc build, which has be...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/3016 Thank you for fixing this, David! I have just verified that this fixes the Buildbot related issues. It is a shame that we only have Ruby 1.9, but for now we will have to deal with this limitation. Especially because we want up-to-date docs for the release :) Merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2917 Thank you, will go ahead and merge then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3017: [FLINK-5350] don't overwrite an existing JAAS conf...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/3017 [FLINK-5350] don't overwrite an existing JAAS config Users may want to use SASL/PLAIN https://tools.ietf.org/html/rfc4616 without Kerberos. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-5350 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3017.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3017 commit 67c154666779609dacca2073fc70c5b7726435b7 Author: Maximilian Michels Date: 2016-12-15T14:29:21Z [FLINK-5350] don't overwrite an existing JAAS config --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2917 I've added the new staging repository to test the PR changes. Also, the repository is currently deploying to Maven central. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2917 Thanks you @tillrohrmann and @StephanEwen. I've addressed your comments. I'll have to redeploy Flakka because the staging repository which this PR used, has been dropped in the meantime. I will update the PR tomorrow to use the Maven central servers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2917#discussion_r92661183 --- Diff: flink-runtime/pom.xml --- @@ -193,8 +193,8 @@ under the License. - com.typesafe.akka - akka-testkit_${scala.binary.version} + com.data-artisans + flakka-testkit_${scala.binary.version} --- End diff -- This probably needs to be changed independently of this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2917#discussion_r92659443 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala --- @@ -216,12 +219,19 @@ object AkkaUtils { * identified by hostname. * * @param configuration instance containing the user provided configuration values - * @param hostname of the network interface to listen on + * @param hostname of the network interface to bind on * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages * @return Flink's Akka configuration for remote actor systems */ private def getRemoteAkkaConfig(configuration: Configuration, - hostname: String, port: Int): Config = { + hostname: String, port: Int, + externalHostname: String, externalPort: Int): Config = { + +LOG.info(s"Using binding address $hostname:$port" + --- End diff -- That's right. Removing this statement because this is also logged at JobManager startup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2917#discussion_r92658704 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala --- @@ -216,12 +219,19 @@ object AkkaUtils { * identified by hostname. * * @param configuration instance containing the user provided configuration values - * @param hostname of the network interface to listen on + * @param hostname of the network interface to bind on * @param port to bind to or if 0 then Akka picks a free port automatically + * @param externalHostname The host name to expect for Akka messages + * @param externalPort The port to expect for Akka messages * @return Flink's Akka configuration for remote actor systems */ private def getRemoteAkkaConfig(configuration: Configuration, - hostname: String, port: Int): Config = { + hostname: String, port: Int, --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2917#discussion_r92658254 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala --- @@ -102,21 +102,24 @@ object AkkaUtils { * specified, then the actor system will listen on the respective address. * * @param configuration instance containing the user provided configuration values - * @param listeningAddress optional tuple of hostname and port to listen on. If None is given, - * then an Akka config for local actor system will be returned + * @param externalAddress optional tuple of hostname and port to be reachable at. + *If None is given, then an Akka config for local actor system + *will be returned * @return Akka config */ @throws(classOf[UnknownHostException]) def getAkkaConfig(configuration: Configuration, -listeningAddress: Option[(String, Int)]): Config = { +externalAddress: Option[(String, Int)]): Config = { val defaultConfig = getBasicAkkaConfig(configuration) -listeningAddress match { +externalAddress match { case Some((hostname, port)) => -val ipAddress = InetAddress.getByName(hostname) -val hostString = "\"" + NetUtils.ipAddressToUrlString(ipAddress) + "\"" -val remoteConfig = getRemoteAkkaConfig(configuration, hostString, port) + +val remoteConfig = getRemoteAkkaConfig(configuration, + NetUtils.getWildcardIPAddress, port, --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3013: [FLINK-5344] relax spec for requested ruby version...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3013#discussion_r92653981 --- Diff: docs/build_docs.sh --- @@ -46,8 +46,8 @@ DOCS_DST=${DOCS_SRC}/content JEKYLL_CMD="build" # if -p flag is provided, serve site on localhost -# -i is like -p, but incremental (which has some issues, but is very fast) -while getopts ":p:i" opt; do +# -i is like -p, but incremental (only rebuilds the modified file) +while getopts "pi" opt; do --- End diff -- Does this fix the `-p` argument? In the master, only the `-i` argument is working. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3013: [FLINK-5344] relax spec for requested ruby version...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/3013#discussion_r92654203 --- Diff: docs/Gemfile --- @@ -17,13 +17,14 @@ source 'https://rubygems.org' -ruby '~> 2.3.0' +ruby '~> 2' --- End diff -- When we upgraded this from `~> 1.9`, the nightly Buildbot stopped working: https://ci.apache.org/builders/flink-docs-master/builds/557/steps/Flink%20docs/logs/stdio --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2928 Thanks for your understanding :) Could you please close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2997: [FLINK-5240][tests] ensure state backends are properly cl...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2997 Thanks for the review, will merge then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3007: [FLINK-4922][docs] document how to use Flink on Me...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/3007 [FLINK-4922][docs] document how to use Flink on Mesos This adds some initial user documentation for Flink on Mesos. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4922 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3007.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3007 commit 09ff67e9b5d1d1b0bbed569596b73a2d6429f910 Author: Maximilian Michels Date: 2016-12-14T14:14:50Z [FLINK-4922][docs] document how to use Flink on Mesos commit 0b472b98e91e938a7e00ff36e3e0cbff2208fd31 Author: Maximilian Michels Date: 2016-12-14T14:35:30Z cleanup Mesos configuration entries --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2928 There is one problem we overlooked. In detached mode we ensure cluster shutdown through a message sent by the client during job submission to tell the JobManager that this is going to be the last job it has to execute. In interactive execution mode, the user jar can contain multiple jobs; this is mostly useful for interactive batch jobs. Since we just execute the main method of the user jar, we don't know how many jobs are submitted and when to shutdown the cluster. That's why we chose to delegate the shutdown to the client for interactive jobs. Thus, I'm hesitant to remove the shutdown hook because it ensures that the cluster shuts down during interactive job executions. It prevents clusters from lingering around when the client shuts down. A couple of solution for this problem: 1. The JobManager watches the client and shuts down a) if it looses connection to the client and the job it executes has completed or b) the client tells the JobManager to shut down. 2. The JobManager drives the execution which is now part of the client 3. We don't allow multiple jobs to execute. Then we always have a clear shutdown point. This is perhaps the easiest and most elegant solution. Most users only execute a single job at a time anyways. We can still allow interactive job executions if the user chooses to. Perhaps we can make this more explicit in the API to give a hint to the client. I'm afraid we will have to close this PR until we realize one of the above solutions (or another one). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2997: [FLINK-5240][tests] ensure state backends are prop...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2997 [FLINK-5240][tests] ensure state backends are properly closed This adds additional test cases to verify the state backends are closed properly upon the end of a task. The state backends should always be closed regardless of the final state of the task. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-5240 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2997.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2997 commit 5162c26d4cf59be8f997ee1e99200ff143f13db2 Author: Maximilian Michels Date: 2016-12-13T14:21:31Z [FLINK-5240][tests] ensure state backends are properly closed This adds additional test cases to verify the state backends are closed properly upon the end of a task. The state backends should always be closed regardless of the final state of the task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2928 I will go ahead and merge this PR since there have been no further comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2981: [docker] improve Dockerfile host configuration
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2981 [docker] improve Dockerfile host configuration - configure job manager address for both operation modes - introduce argument to specify the external job manager address - replace ARG with ENV for backwards-compatibility - EXPOSE web port and RPC port You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink docker Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2981.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2981 commit b0e19d6c2d5bea45e2b87ff63c01c07996ef665c Author: Maximilian Michels Date: 2016-12-09T16:58:30Z [docker] improve Dockerfile host configuration - configure job manager address for both operation modes - introduce argument to specify the external job manager address - replace ARG with ENV for backwards-compatibility - EXPOSE web port and RPC port --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2917 @StephanEwen I've updated the pull request to incorporate your suggestions. When an IPv6 address is specified, we format it like in the current code base. When a hostname is specified, we do some simple validation but do not resolve it. IPv4 addresses are simply used as-is. I hope that this PR becomes more mergeable in this state. With the help of some eager users we can further test this in a cluster environment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2951: [docs] clarify default restart behavior when check...
Github user mxm closed the pull request at: https://github.com/apache/flink/pull/2951 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2951: [docs] clarify default restart behavior when checkpointin...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2951 Sorry, I missed your comment. We can open another PR to clarify that! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2954: [docs] Note that numberOfExecutionRetries and executionRe...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2954 Merged in c024b0b6cae16a0b668d864c77e923820c262087 but forgot to include the "This closes #2954" message in the commit, could you close the PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2951: [docs] clarify default restart behavior when check...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2951 [docs] clarify default restart behavior when checkpointing is enabled Merging this to `master` and `release-1.1`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2951.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2951 commit 9e1c9798665bf66a8299459b36a01cb330e977ce Author: Maximilian Michels Date: 2016-12-06T14:12:22Z [docs] clarify default restart behavior when checkpointing is enabled --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2915: [FLINK-5091] Formalize the Mesos AppMaster environment fo...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2915 Looks really good!. Merging with some minor changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2928 I think the original intend of the Client shutdown hook was to make sure that clusters which are spawned and didn't receive jobs are cleaned up again. However, that behavior can be quite tricky because a job might actually have been submitted externally. So +1 for removing it altogether. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2945: [FLINK-5262][docs] Introduce Gemfile.lock to avoid depend...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2945 I've tested the changes through this build: https://ci.apache.org/builders/flink-docs-FLINK-3887/builds/36 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2945: [FLINK-5262][docs] Introduce Gemfile.lock to avoid...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2945 [FLINK-5262][docs] Introduce Gemfile.lock to avoid dependency range conflicts The Gemfile for specifying the Ruby dependencies of our documentation has fixed dependency versions to avoid incompatible changes with different versions of the dependencies. However, Ruby's dependency management allows artifacts to specify ranges for dependencies. This can be problematic. For instance, we use 'jekyll' version 2.5.3 which depends on 'jekyll-gist' ~> 1.0 which means 1.0 >= version < 2.0. This may resolve 'jekyll-gist' 1.4.0 which depends on 'octokit' ~> 4.2 which may be 4.2 \>= versions < 5.0. Too bad, 'octokit' starting with 4.4 depends on Ruby version >= 2.0 which is not available on our build servers. Since we already use the improved version of Rubys build system called 'bundler', we can mitigate this problem by checking in a Gemfile.lock file which specifies the exact versions of all dependencies required to build the docs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-5262 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2945.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2945 commit b3df07a62317326290d8a3fa1f4d833e0e7b430a Author: Maximilian Michels Date: 2016-12-05T13:32:11Z [FLINK-5262][docs] Introduce Gemfile.lock to avoid dependency range conflicts The Gemfile for specifying the Ruby dependencies of our documentation has fixed dependency versions to avoid incompatible changes with different versions of the dependencies. However, Ruby's dependency management allows artifacts to specify ranges for dependencies. This can be problematic. For instance, we use 'jekyll' version 2.5.3 which depends on 'jekyll-gist' ~> 1.0 which means 1.0 >= version < 2.0. This may resolve 'jekyll-gist' 1.4.0 which depends on 'octokit' ~> 4.2 which may be 4.2 >= versions < 5.0. Too bad, 'octokit' starting with 4.4 depends on Ruby version >= 2.0 which is not available on our build servers. Since we already use the improved version of Rubys build system called 'bundler', we can mitigate this problem by checking in a Gemfile.lock file which specifies the exact versions of all dependencies required to build the docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2928: [FLINK-5108] Remove ClientShutdownHook during job executi...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2928 Thanks for the PR! This removes the shutdown hook after the cluster has been deployed. The original intend was to remove it only after job submission. Probably it makes sense to remove it all-together. I have to think about it. Perhaps @rmetzger could comment on this as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90664938 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/SSLStoreOverlay.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays an SSL keystore/truststore into a container. + * + * The following files are placed into the container: + * - keystore.jks + * - truststore.jks + * + * The following Flink configuration entries are set: + * - security.ssl.keystore + * - security.ssl.truststore + */ +public class SSLStoreOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(SSLStoreOverlay.class); + + static final Path TARGET_KEYSTORE_PATH = new Path("keystore.jks"); + static final Path TARGET_TRUSTSTORE_PATH = new Path("truststore.jks"); + + final Path keystore; + final Path truststore; + + public SSLStoreOverlay(@Nullable File keystoreFile, @Nullable File truststoreFile) { + this.keystore = keystoreFile != null ? new Path(keystoreFile.toURI()) : null; + this.truststore = truststoreFile != null ? new Path(truststoreFile.toURI()) : null; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(keystore != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(keystore) + .setDest(TARGET_KEYSTORE_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_KEYSTORE, TARGET_KEYSTORE_PATH.getPath()); + } + if(truststore != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(truststore) + .setDest(TARGET_TRUSTSTORE_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, TARGET_TRUSTSTORE_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** +* A builder for the {@link Krb5ConfOverlay}. +*/ + public static class Builder { + + File keystorePath; + + File truststorePath; + + /** +* Configures the overlay using the current environment (and global configuration). +* +* The following Flink configuration settings are used to source the keystore and truststore: +* - security.ssl.keystore +* - security.ssl.truststore + */ --- End diff -- indention is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90454802 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/AbstractContainerOverlay.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +/** + * An abstract container overlay. --- End diff -- Could you elaborate a bit here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90664759 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/HadoopUserOverlay.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; + +/** + * Overlays a Hadoop user context into a container. + * + * The overlay essentially configures Hadoop's {@link UserGroupInformation} class, + * establishing the effective username for filesystem calls to HDFS in non-secure clusters. + * + * In secure clusters, the configured keytab establishes the effective user. + * + * The following environment variables are set in the container: + * - HADOOP_USER_NAME + */ +public class HadoopUserOverlay implements ContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUserOverlay.class); + + private final UserGroupInformation ugi; + + public HadoopUserOverlay(@Nullable UserGroupInformation ugi) { + this.ugi = ugi; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(ugi != null) { + // overlay the Hadoop user identity (w/ tokens) + container.getEnvironmentVariables().put("HADOOP_USER_NAME", ugi.getUserName()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** +* A builder for the {@link HadoopUserOverlay}. +*/ + public static class Builder { + + UserGroupInformation ugi; + + /** +* Configures the overlay using the current Hadoop user information (from {@link UserGroupInformation}). + */ --- End diff -- indention is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90666953 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -542,42 +542,10 @@ protected static File asFile(String path) { return configs; } - // This code is taken from: http://stackoverflow.com/a/7201825/568695 - // it changes the environment variables of this JVM. Use only for testing purposes! - @SuppressWarnings("unchecked") public static void setEnv(Map newenv) { --- End diff -- How about removing this method and redirecting the calls? Was there a dependency management reason you didn't want to do that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90454287 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/ContainerOverlay.java --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.runtime.clusterframework.ContainerSpecification; + +import java.io.IOException; + +/** + * A container overlay to produce a container specification. + * + * An overlay applies configuration elements, environment variables, + * system properties, and artifacts to a container specification. + */ +public interface ContainerOverlay { + + /** +* Configure the given container specification. + */ --- End diff -- Indention is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90664795 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/Krb5ConfOverlay.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays a Kerberos configuration file into a container. + * + * The following files are copied to the container: + * - krb5.conf + * + * The following Java system properties are set in the container: + * - java.security.krb5.conf + */ +public class Krb5ConfOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(Krb5ConfOverlay.class); + + static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + static final Path TARGET_PATH = new Path("krb5.conf"); + final Path krb5Conf; + + public Krb5ConfOverlay(@Nullable File krb5Conf) { + this.krb5Conf = krb5Conf != null ? new Path(krb5Conf.toURI()) : null; + } + + public Krb5ConfOverlay(@Nullable Path krb5Conf) { + this.krb5Conf = krb5Conf; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(krb5Conf != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(krb5Conf) + .setDest(TARGET_PATH) + .setCachable(true) + .build()); + container.getSystemProperties().setString(JAVA_SECURITY_KRB5_CONF, TARGET_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** +* A builder for the {@link Krb5ConfOverlay}. +*/ + public static class Builder { + + File krb5ConfPath; + + /** +* Configures the overlay using the current environment. +* +* Locates the krb5.conf configuration file as per +* https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html";>Java documentation. +* Note that the JRE doesn't support the KRB5_CONFIG environment variable (JDK-7045913). + */ --- End diff -- indention is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90665396 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +import static org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay.TARGET_CONF_DIR; + +public class HadoopConfOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File confDir = tempFolder.newFolder(); + initConfDir(confDir); + + HadoopConfOverlay overlay = new HadoopConfOverlay(confDir); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR")); + assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null)); + + checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml")); + checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml")); + } + + @Test + public void testNoConf() throws Exception { + HadoopConfOverlay overlay = new HadoopConfOverlay(null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { --- End diff -- indention is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90666707 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/HadoopConfOverlayTest.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +import static org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay.TARGET_CONF_DIR; + +public class HadoopConfOverlayTest extends ContainerOverlayTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testConfigure() throws Exception { + + File confDir = tempFolder.newFolder(); + initConfDir(confDir); + + HadoopConfOverlay overlay = new HadoopConfOverlay(confDir); + + ContainerSpecification spec = new ContainerSpecification(); + overlay.configure(spec); + + assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR")); + assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null)); + + checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml")); + checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml")); + } + + @Test + public void testNoConf() throws Exception { + HadoopConfOverlay overlay = new HadoopConfOverlay(null); + + ContainerSpecification containerSpecification = new ContainerSpecification(); + overlay.configure(containerSpecification); + } + + @Test + public void testBuilderFromEnvironment() throws Exception { + + // verify that the builder picks up various environment locations + HadoopConfOverlay.Builder builder; + Map env; + + // fs.hdfs.hadoopconf + File confDir = tempFolder.newFolder(); + initConfDir(confDir); + Configuration conf = new Configuration(); + conf.setString(ConfigConstants.PATH_HADOOP_CONFIG, confDir.getAbsolutePath()); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(conf); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_CONF_DIR + env = new HashMap(System.getenv()); + env.remove("HADOOP_HOME"); + env.put("HADOOP_CONF_DIR", confDir.getAbsolutePath()); + CommonTestUtils.setEnv(env); + builder = HadoopConfOverlay.newBuilder().fromEnvironment(new Configuration()); + assertEquals(confDir, builder.hadoopConfDir); + + // HADOOP_HOME/conf + File homeDir = tempFolder.newFolder(); + confDir = initConfDir(new File(homeDir, "conf")); + env = new HashMap(System.getenv()); + env.remove("HADOOP_CONF_DIR"); + env.put("HADOOP_HOME", homeDir.getAbsolutePath()); --- End diff -- Should we restore the original environment for all test cases in an @AfterTest method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not h
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90664693 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/FlinkDistributionOverlay.java --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_BIN_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_HOME_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Overlays Flink into a container, based on supplied bin/conf/lib directories. + * + * The overlayed Flink is indistinguishable from (and interchangeable with) + * a normal installation of Flink. For a docker image-based container, it should be + * possible to bypass this overlay and rely on the normal installation method. + * + * The following files are copied to the container: + * - flink/bin/ + * - flink/conf/ + * - flink/lib/ + */ +public class FlinkDistributionOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkDistributionOverlay.class); + + static final Path TARGET_ROOT = new Path("flink"); + + final File flinkBinPath; + final File flinkConfPath; + final File flinkLibPath; + + public FlinkDistributionOverlay(File flinkBinPath, File flinkConfPath, File flinkLibPath) { + this.flinkBinPath = checkNotNull(flinkBinPath); + this.flinkConfPath = checkNotNull(flinkConfPath); + this.flinkLibPath = checkNotNull(flinkLibPath); + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + + container.getEnvironmentVariables().put(ENV_FLINK_HOME_DIR, TARGET_ROOT.toString()); + + // add the paths to the container specification. + addPathRecursively(flinkBinPath, TARGET_ROOT, container); + addPathRecursively(flinkConfPath, TARGET_ROOT, container); + addPathRecursively(flinkLibPath, TARGET_ROOT, container); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** +* A builder for the {@link FlinkDistributionOverlay}. +*/ + public static class Builder { + File flinkBinPath; + File flinkConfPath; + File flinkLibPath; + + /** +* Configures the overlay using the current environment. +* +* Locates Flink using FLINK_???_DIR environment variables as provided to all Flink processes by config.sh. +* +* @param globalConfiguration the current configuration. + */ --- End diff -- indention is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2915#discussion_r90664782 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlay.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.overlays; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; + + +/** + * Overlays cluster-level Kerberos credentials (i.e. keytab) into a container. + * + * The folloowing Flink configuration entries are updated: + * - security.keytab + */ +public class KeytabOverlay extends AbstractContainerOverlay { + + private static final Logger LOG = LoggerFactory.getLogger(KeytabOverlay.class); + + static final Path TARGET_PATH = new Path("krb5.keytab"); + + final Path keytab; + + public KeytabOverlay(@Nullable File keytab) { + this.keytab = keytab != null ? new Path(keytab.toURI()) : null; + } + + public KeytabOverlay(@Nullable Path keytab) { + this.keytab = keytab; + } + + @Override + public void configure(ContainerSpecification container) throws IOException { + if(keytab != null) { + container.getArtifacts().add(ContainerSpecification.Artifact.newBuilder() + .setSource(keytab) + .setDest(TARGET_PATH) + .setCachable(false) + .build()); + container.getDynamicConfiguration().setString(ConfigConstants.SECURITY_KEYTAB_KEY, TARGET_PATH.getPath()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** +* A builder for the {@link HadoopUserOverlay}. +*/ + public static class Builder { + + File keytabPath; + + /** +* Configures the overlay using the current environment (and global configuration). +* +* The following Flink configuration settings are checked for a keytab: +* - security.keytab + */ --- End diff -- indention is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2917 >Can we try and preserve both in some way? When a logical hostname is specified, we use that one in the Akka URLs. When an IP address is specified, we normalize it and use it. Detecting whether we have an IPv4/IPv6 address or a hostname should be possible. Let me see if I can incorporate that in the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2917#discussion_r90627441 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1891,8 +1891,8 @@ object JobManager { // parsing the command line arguments val (configuration: Configuration, executionMode: JobManagerMode, - listeningHost: String, - listeningPortRange: java.util.Iterator[Integer]) = + reachableHost: String, --- End diff -- Agreed. I've switched terms during some iterations on the changes. I think `externalHostname` is the better alternative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2917: [FLINK-2821] use custom Akka build to listen on all inter...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2917 Thanks for checking out the code! >If different parts of the code or the JDK do a subtle change of behavior (i.e. resolve the InetAddress), then some nodes may have a hostname in the URL, others an address. Fair point. Let's remove InetAddress and use a String instead. >If two machines have a slightly different network configuration (especially concerning preferences to represent/encode IPv6 addresses) and the users set the IP address as the JobManager host, then they might create different Akka URLs and the machines cannot talk to each other again. A problem of the old method was that hostnames might resolve differently depending on the container context. So that method was not reliable either. Using IP addresses will work with the new method. They just have to be consistent on all node configurations. I don't think that is a problem because a configuration is usually created once and then copied over to all nodes. >I am a bit unsure how to proceed from here. Is there any way we can keep using IP addresses in the Akka URLs? Or does that just inherently not make sense with "dynamic hostnames" as they are used in container environments? For this PR to address the core problems of FLINK-2821, we have to avoid resolving the hostname because the IP address would represent the internal container address which may be unreachable from the outside; even if it were resolvable, Akka would drop the messages because of its exact URL match policy. Using the hostname which now acts as a purely logical address ultimately is a more reliable way across different network environments. We just have to make sure we document the new requirement that the JobManager address (JOB_MANAGER_IPC_ADDRESS) is consistent across all Flink cluster nodes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2917#discussion_r90621937 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -2729,11 +2727,11 @@ object JobManager { def getRemoteJobManagerAkkaURL(config: Configuration) : String = { val (protocol, hostname, port) = TaskManager.getAndCheckJobManagerAddress(config) -var hostPort: InetSocketAddress = null +var hostPort: InetSocketAddress = new InetSocketAddress(hostname, port) --- End diff -- It is just a check to see if we expect messages from a resolvable hostname. It is not necessary to do that but I found test cases rely on that. Also, it improves error reporting for the user in case a non-resolvable hostname was chosen. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2917#discussion_r90621698 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1891,8 +1891,8 @@ object JobManager { // parsing the command line arguments val (configuration: Configuration, executionMode: JobManagerMode, - listeningHost: String, - listeningPortRange: java.util.Iterator[Integer]) = + reachableHost: String, --- End diff -- It is the consistent external hostname of the JobManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2908: [maven] properly attach the CEP Scala source code
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2908#discussion_r90469083 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -93,6 +93,8 @@ under the License. + +src/main/scala --- End diff -- Yes, one of the two works. The easiest fix is this one liner though I'm not sure about Eclipse. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2917: [FLINK-2821] use custom Akka build to listen on al...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2917 [FLINK-2821] use custom Akka build to listen on all interfaces This uses Flakka (a custom Akka 2.3 build) to resolve the issue that the bind address needs to be matching the external address of the JobManager. With the changes applied, we can now bind to all interfaces, e.g. via 0.0.0.0 (IPv4) or :: (Ipv6). For this to work properly, the configuration entry JOB_MANAGER_IPC_ADDRESS now represents the external address of the JobManager. Consequently, it should not be resolved to an IP address anymore because it may not be resolvable from within containered environments. Akka treats this address as the logical address. Any messages which are not tagged with this address will be received by the Actor System (because we listen on all interfaces) but will be dropped subsequently. In addition, we need the external address for the JobManager to be able to publish its address to Zookeeper for HA setups. Flakka: https://github.com/mxm/flakka Patch applied: akka/akka#15610 You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-2821.master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2917.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2917 commit 1ab82041f01df10b8e86da7e9b2696dd175f7e89 Author: Maximilian Michels Date: 2016-11-16T14:50:01Z [FLINK-2821] use custom Akka build to listen on all interfaces This uses Flakka (a custom Akka 2.3 build) to resolve the issue that the bind address needs to be matching the external address of the JobManager. With the changes applied, we can now bind to all interfaces, e.g. via 0.0.0.0 (IPv4) or :: (Ipv6). For this to work properly, the configuration entry JOB_MANAGER_IPC_ADDRESS now represents the external address of the JobManager. Consequently, it should not be resolved to an IP address anymore because it may not be resolvable from within containered environments. Akka treats this address as the logical address. Any messages which are not tagged with this address will be received by the Actor System (because we listen on all interfaces) but will be dropped subsequently. In addition, we need the external address for the JobManager to be able to publish it to Zookeeper for HA setups. Flakka: https://github.com/mxm/flakka Patch applied: https://github.com/akka/akka/pull/15610 commit 10b66ff80fec27102417e675c0e99cbad11abfc3 Author: Maximilian Michels Date: 2016-11-30T18:21:26Z use staging repository for now commit 9b8059e1b7217f56fc277a9ac886dd6150190045 Author: Maximilian Michels Date: 2016-12-01T14:50:11Z adapt config and test cases --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2900: Rebased: Keytab & TLS support for Flink on Mesos Setup
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2900 Tests passed. Merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2908: [maven] properly attach the CEP Scala source code
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2908 [maven] properly attach the CEP Scala source code Two options, either change the default Maven source directory from 'src/main/java' to 'src/main/scala' or use the build-helper-maven-plugin to attach the Scala sources. Opting for both here to be in lines with Maven standards and support Eclipse. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink cep-source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2908.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2908 commit 9b26a6308e185a83df9d482539ed803937efd66b Author: Maximilian Michels Date: 2016-11-30T11:20:40Z [maven] properly attach the CEP Scala source code Two options, either change the default Maven source directory from 'src/main/java' to 'src/main/scala' or use the build-helper-maven-plugin to attach the Scala sources. Opting for both here to be in lines with Maven standards and support Eclipse. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2900: Rebased: Keytab & TLS support for Flink on Mesos Setup
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2900 Rebased again to the latest Mesos changes to make sure tests pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2900: Rebased: Keytab & TLS support for Flink on Mesos S...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2900 Rebased: Keytab & TLS support for Flink on Mesos Setup Rebased #2734 to the latest master. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-4826 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2900.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2900 commit 5ccd09cf30e4b4def0e333ead7509efe5515519a Author: Vijay Srinivasaraghavan Date: 2016-10-13T22:45:35Z FLINK-4826 Added keytab support to mesos container commit 518de5e37f28b7060b534cae95655b063b4e2d36 Author: Vijay Srinivasaraghavan Date: 2016-10-31T16:54:03Z FLINK-4918 Added SSL handler to artifact server --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2827: [FLINK-4921] Upgrade to Mesos 1.0.1
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2827 Makes sense then. Do distributions like DC/OS already ship Mesos 1.0.1? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2827: [FLINK-4921] Upgrade to Mesos 1.0.1
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2827 Looks good to me. Do we lose backwards-capability for Mesos installations < 1.0? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2734: Keytab & TLS support for Flink on Mesos Setup
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2734 Sorry this is taking so long. You you please rebase to the latest master? @EronWright Could you take a look at the changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2836: [FLINK-5092] Add maven profile with code coverage report ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2836 It does not: https://github.com/jacoco/jacoco/wiki/MavenMultiModule That's a bummer but apparently there are some workarounds: http://www.thinkcode.se/blog/2012/02/18/test-coverage-in-a- multi-module-maven-project On Fri, Nov 25, 2016 at 1:38 PM, zentol wrote: > The plugin cannot detect cross-module coverage, correct? as in, all the > tests in flink-tests will not contribute in any way to the coverage of > other modules? > > — > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/flink/pull/2836#issuecomment-262949747>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAzGZWhV9Br5nOwVw0MSczgGOLPccfsxks5rBtbagaJpZM4K31ag> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2864: [FLINK-5055][security] skip Hadoop UGI login if un...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2864 [FLINK-5055][security] skip Hadoop UGI login if unsecured The new Kerberos authentication code in Flink assumed that it's running against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if security is not configured. This is different for other distributions, e.g. the MapR Hadoop distribution of Hadoop. Thus, we need to make sure we don't perform any login action if security is not configured. This also performs minor code cleanup. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-5055 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2864.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2864 commit 8193024a6451dd2594348ac0f001ed39b80f7302 Author: Maximilian Michels Date: 2016-11-24T16:12:39Z [FLINK-5055][security] skip Hadoop UGI login if unsecured The new Kerberos authentication code in Flink assumed that it's running against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if security is not configured. This is different for other distributions, e.g. the MapR Hadoop distribution of Hadoop. Thus, we need to make sure we don't perform any login action if security is not configured. This also performs minor code cleanup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2856: Removed excessive tests.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2856 You removed the tests you introduced in #2623. If there were any tests in `KryoCollectionsSerializerTest` which test `Arrays.asList(..)` they would have failed prior merging #2623. Could you add a unit test for `Arrays.asList(..)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2623 Sure, I'll take care of it. I saw you giving a +1 and didn't see an issue myself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2860: [FLINK-5149] let ContinuousEventTimeTrigger fire at the e...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2860 Thanks, I'll leave this open for a bit. I wasn't aware we also had an issue for removing the trigger. I don't know how useful it is to users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2803: [FLINK-5061] Remove ContinuousEventTimeTrigger
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2803 I just opened a PR as well :) I was under the assumption that a continuous trigger is useful for early window results. Not sure if it might be confusing for users because you can receive elements newer than the current Watermark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2860: [FLINK-5149] let ContinuousEventTimeTrigger fire at the e...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2860 CC @kl0u @aljoscha --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2860: [FLINK-5149] let ContinuousEventTimeTrigger fire a...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2860 [FLINK-5149] let ContinuousEventTimeTrigger fire at the end of the window This changes the ContinuousEventTimeTrigger to behave like the EventTimeTrigger in the sense that it also triggers at the end of the window. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-5149 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2860.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2860 commit 18d3c2bd2dee225e274d0eda3bc7e5ccbe1ba3df Author: Maximilian Michels Date: 2016-11-23T15:01:35Z [FLINK-5149] let ContinuousEventTimeTrigger fire at the end of the window This changes the ContinuousEventTimeTrigger to behave like the EventTimeTrigger in the sense that it also triggers at the end of the window. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2854: [typo] fix toString() of ContinuousEventTimeTrigge...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2854 [typo] fix toString() of ContinuousEventTimeTrigger You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2854 commit 2da1a251d695315ee4312928307a55971f3ed215 Author: Maximilian Michels Date: 2016-11-23T11:02:09Z [typo] fix toString() of ContinuousEventTimeTrigger --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r89284152 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml --- @@ -193,9 +192,7 @@ under the License. 1 - - 0${surefire.forkNumber} - + -Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -XX:-UseGCOverheadLimit --- End diff -- I think this can be reduced to ```xml @{argLine} -Xmx1000m ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r89282667 --- Diff: pom.xml --- @@ -996,10 +996,9 @@ under the License. ${flink.reuseForks} 0${surefire.forkNumber} - 0${surefire.forkNumber} --- End diff -- Actually, this is probably fine. Could you re-add this line and remove the parameter from the the `argLine`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r89282258 --- Diff: pom.xml --- @@ -93,7 +93,7 @@ under the License. 1C true log4j-test.properties - -Xms256m -Xmx800m -XX:-UseGCOverheadLimit + --- End diff -- This is not needed anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r89284243 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -197,9 +196,7 @@ under the License. 1 - - 0${surefire.forkNumber} - + -Xms256m -Xmx1000m -Dlog4j.configuration=${log4j.configuration} -XX:-UseGCOverheadLimit --- End diff -- I think this can be reduced to ```xml @{argLine} -Xmx1000m ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2836: [FLINK-5092] Add maven profile with code coverage ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2836#discussion_r89284270 --- Diff: flink-streaming-connectors/flink-connector-cassandra/pom.xml --- @@ -50,9 +49,7 @@ under the License. true 1 - - 0${surefire.forkNumber} - + -Xms256m -Xmx2800m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit --- End diff -- I think this can be reduced to ```xml @{argLine} -Xmx2800m ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2623 Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89161736 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/JobClient.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common; + +import java.util.Map; + +/* + * An Flink job client interface to interact with running Flink jobs. + */ +public interface JobClient { + + /** +* Gets the JobID associated with this JobClient. +*/ + JobID getJobID(); + + /** +* Returns a boolean indicating whether the job execution has finished. +*/ + boolean hasFinished() throws Exception; + + /** +* Blocks until the result of the job execution is returned. +*/ + JobExecutionResult waitForResult() throws Exception; + + /** +* Gets the accumulator map of a running job. +*/ + Map getAccumulators() throws Exception; + + /** +* Cancels a running job. +*/ + void cancel() throws Exception; + + /** +* Stops a running job if the job supports stopping. +*/ + void stop() throws Exception; + + /** +* Adds a Runnable to this JobClient to be called +* when the client is shut down. Runnables are called +* in the order they are added. +*/ + void addFinalizer(Runnable finalizer) throws Exception; + + /** +* Runs finalization code to shutdown the client +* and its dependencies. +*/ + void shutdown(); --- End diff -- Correct, let's see if we can solely dedicate the execution of shutdown to finalizers and shutdown hooks then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2623 The build fails during the Maven Rat plugin license check. Could you fix the build and rebase to the latest master? I think we can merge then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89141224 --- Diff: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java --- @@ -172,29 +174,40 @@ public JobExecutionResult executePlan(Plan plan) throws Exception { // start the cluster for us start(); - } - else { + } else { // we use the existing session shutDownAtEnd = false; } - try { - Configuration configuration = this.flink.configuration(); + Configuration configuration = this.flink.configuration(); - Optimizer pc = new Optimizer(new DataStatistics(), configuration); - OptimizedPlan op = pc.compile(plan); + Optimizer pc = new Optimizer(new DataStatistics(), configuration); + OptimizedPlan op = pc.compile(plan); - JobGraphGenerator jgg = new JobGraphGenerator(configuration); - JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); + JobGraphGenerator jgg = new JobGraphGenerator(configuration); + JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); - boolean sysoutPrint = isPrintingStatusDuringExecution(); - return flink.submitJobAndWait(jobGraph, sysoutPrint); - } - finally { - if (shutDownAtEnd) { - stop(); + boolean sysoutPrint = isPrintingStatusDuringExecution(); + + --- End diff -- Thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89141153 --- Diff: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java --- @@ -172,29 +174,40 @@ public JobExecutionResult executePlan(Plan plan) throws Exception { // start the cluster for us start(); - } - else { + } else { // we use the existing session shutDownAtEnd = false; } - try { - Configuration configuration = this.flink.configuration(); + Configuration configuration = this.flink.configuration(); - Optimizer pc = new Optimizer(new DataStatistics(), configuration); - OptimizedPlan op = pc.compile(plan); + Optimizer pc = new Optimizer(new DataStatistics(), configuration); + OptimizedPlan op = pc.compile(plan); - JobGraphGenerator jgg = new JobGraphGenerator(configuration); - JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); + JobGraphGenerator jgg = new JobGraphGenerator(configuration); + JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); - boolean sysoutPrint = isPrintingStatusDuringExecution(); - return flink.submitJobAndWait(jobGraph, sysoutPrint); - } - finally { - if (shutDownAtEnd) { - stop(); + boolean sysoutPrint = isPrintingStatusDuringExecution(); + + + JobListeningContext jobListeningContext = flink.submitJob(jobGraph, sysoutPrint); + JobClientEager jobClient = new JobClientEager(jobListeningContext); + + Runnable cleanup = new Runnable() { + @Override + public void run() { + if (shutDownAtEnd) { --- End diff -- We could but it wouldn't make any semantic difference since the enclosed variable must be final. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89140934 --- Diff: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java --- @@ -172,29 +174,40 @@ public JobExecutionResult executePlan(Plan plan) throws Exception { // start the cluster for us start(); - } - else { + } else { // we use the existing session shutDownAtEnd = false; } - try { - Configuration configuration = this.flink.configuration(); + Configuration configuration = this.flink.configuration(); - Optimizer pc = new Optimizer(new DataStatistics(), configuration); - OptimizedPlan op = pc.compile(plan); + Optimizer pc = new Optimizer(new DataStatistics(), configuration); + OptimizedPlan op = pc.compile(plan); - JobGraphGenerator jgg = new JobGraphGenerator(configuration); - JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); + JobGraphGenerator jgg = new JobGraphGenerator(configuration); + JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); - boolean sysoutPrint = isPrintingStatusDuringExecution(); - return flink.submitJobAndWait(jobGraph, sysoutPrint); - } - finally { - if (shutDownAtEnd) { - stop(); + boolean sysoutPrint = isPrintingStatusDuringExecution(); + + + JobListeningContext jobListeningContext = flink.submitJob(jobGraph, sysoutPrint); + JobClientEager jobClient = new JobClientEager(jobListeningContext); + + Runnable cleanup = new Runnable() { + @Override + public void run() { + if (shutDownAtEnd) { + try { + stop(); + } catch (Exception e) { + throw new RuntimeException("Failed to run cleanup", e); --- End diff -- Thanks, catching exceptions per finalizer would make sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89139301 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.program; + +import org.apache.flink.api.common.JobClient; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClientActorUtils; +import org.apache.flink.runtime.client.JobClientActor; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobListeningContext; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; +import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; +import org.apache.flink.util.SerializedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A client to interact with a running Flink job. + */ +public class JobClientEager implements JobClient { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89139085 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.program; + +import org.apache.flink.api.common.JobClient; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClientActorUtils; +import org.apache.flink.runtime.client.JobClientActor; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobListeningContext; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; +import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; +import org.apache.flink.util.SerializedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A client to interact with a running Flink job. + */ +public class JobClientEager implements JobClient { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + /** The Job's listening context for monitoring and job interaction */ + private final JobListeningContext jobListeningContext; + + /** Finalization code to run upon shutting down the JobClient */ + private final List finalizers; + + public JobClientEager(JobListeningContext jobListeningContext) { + this.jobListeningContext = jobListeningContext; + this.finalizers = new LinkedList<>(); --- End diff -- I don't think it really matters here. The intention was to use as little memory as possible here and since we access finalizers only once, performance is not so important. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89140321 --- Diff: flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java --- @@ -207,14 +207,23 @@ public JobExecutionResult executePlanWithJars(JobWithJars program) throws Except shutDownAtEnd = false; } - try { - return client.run(program, defaultParallelism).getJobExecutionResult(); - } - finally { - if (shutDownAtEnd) { - stop(); - } - } + final JobClient jobClient = client.run(program, defaultParallelism); + + jobClient.addFinalizer( + new Runnable() { + @Override + public void run() { + if (shutDownAtEnd) { + try { + stop(); + } catch (Exception e) { + throw new RuntimeException("Failed to clean up.", e); --- End diff -- Fine, then we need something like Runnable with a checked exception signature. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89139969 --- Diff: flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java --- @@ -207,14 +207,23 @@ public JobExecutionResult executePlanWithJars(JobWithJars program) throws Except shutDownAtEnd = false; } - try { - return client.run(program, defaultParallelism).getJobExecutionResult(); - } - finally { - if (shutDownAtEnd) { - stop(); - } - } + final JobClient jobClient = client.run(program, defaultParallelism); + + jobClient.addFinalizer( + new Runnable() { + @Override + public void run() { + if (shutDownAtEnd) { --- End diff -- This closure should be fine since Java demands the variable to be final. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89138438 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/JobClientTest.java --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.program; + +import akka.dispatch.Futures; +import org.apache.flink.api.common.JobClient; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.JobListeningContext; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.util.SerializedValue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Promise; + +import java.util.Collections; + + +/** + * Tests the JobClient implementations. + * + * See also: JobRetrievalITCase + */ +public class JobClientTest { + + private static boolean finalizeCalled; + + private JobListeningContext listeningContext; + private JobID jobID; + private JobManagerMessages.JobResultSuccess successMessage; + + private Runnable finalizer = new Runnable() { + @Override + public void run() { + finalizeCalled = true; + } + }; + + private Promise resultPromise; + + @Before + public void beforeTest() throws Exception { + finalizeCalled = false; + + this.jobID = JobID.generate(); + this.listeningContext = Mockito.mock(JobListeningContext.class); + this.resultPromise = Futures.promise(); + ActorGateway mockActorClientGateway = Mockito.mock(ActorGateway.class); + Mockito.when(listeningContext.getJobID()).thenReturn(jobID); + Mockito.when(listeningContext.getJobClientGateway()).thenReturn(mockActorClientGateway); + Mockito.when(listeningContext.getJobResultFuture()).thenReturn(resultPromise.future()); + Mockito.when(listeningContext.getClassLoader()).thenReturn(JobClientTest.class.getClassLoader()); + + this.successMessage = new JobManagerMessages.JobResultSuccess( + new SerializedJobExecutionResult( + jobID, + 42, + Collections.singletonMap("key", new SerializedValue("value"; + } + + @Test(timeout = 1) --- End diff -- Just a safety timeout in case anything gets stuck. Doesn't hurt the test case, does it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89138090 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/JobClient.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common; + +import java.util.Map; + +/* + * An Flink job client interface to interact with running Flink jobs. + */ +public interface JobClient { --- End diff -- +1 I could pass the `ClusterClient` to the `JobClient`. I thought I would avoid that because it would expose the ClusterClient also from the regular Java API which is generally agnostic of job submission and cluster management. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89137404 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/JobClient.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common; + +import java.util.Map; + +/* + * An Flink job client interface to interact with running Flink jobs. + */ +public interface JobClient { + + /** +* Gets the JobID associated with this JobClient. +*/ + JobID getJobID(); + + /** +* Returns a boolean indicating whether the job execution has finished. +*/ + boolean hasFinished() throws Exception; + + /** +* Blocks until the result of the job execution is returned. +*/ + JobExecutionResult waitForResult() throws Exception; + + /** +* Gets the accumulator map of a running job. +*/ + Map getAccumulators() throws Exception; + + /** +* Cancels a running job. +*/ + void cancel() throws Exception; + + /** +* Stops a running job if the job supports stopping. +*/ + void stop() throws Exception; + + /** +* Adds a Runnable to this JobClient to be called +* when the client is shut down. Runnables are called +* in the order they are added. +*/ + void addFinalizer(Runnable finalizer) throws Exception; --- End diff -- Yes, that's an issue with sharing the interface across modules. Let me try to get rid of it for the base interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89137049 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java --- @@ -911,6 +912,24 @@ public JobExecutionResult execute() throws Exception { public abstract JobExecutionResult execute(String jobName) throws Exception; /** +* Triggers the program execution, just like {@code execute()} but does not block. +* Instead, it returns a JobClient which can be used to interact with the running job. +* @return A JobClient for job interaction. +* @throws Exception Thrown if the program submission fails. +*/ + public JobClient executeWithControl() throws Exception { + return executeWithControl(getDefaultName()); + } + + /** +* Triggers the program execution, just like {@code execute(String jobName)} but does not block. +* Instead, it returns a JobClient which can be used to interact with the running job. +* @return A JobClient for job interaction. +* @throws Exception Thrown if the program submission fails. +*/ + public abstract JobClient executeWithControl(String jobName) throws Exception; --- End diff -- Maybe `executeWithClient`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89137469 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/JobClient.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common; + +import java.util.Map; + +/* + * An Flink job client interface to interact with running Flink jobs. + */ +public interface JobClient { + + /** +* Gets the JobID associated with this JobClient. +*/ + JobID getJobID(); + + /** +* Returns a boolean indicating whether the job execution has finished. +*/ + boolean hasFinished() throws Exception; + + /** +* Blocks until the result of the job execution is returned. +*/ + JobExecutionResult waitForResult() throws Exception; + + /** +* Gets the accumulator map of a running job. +*/ + Map getAccumulators() throws Exception; + + /** +* Cancels a running job. +*/ + void cancel() throws Exception; + + /** +* Stops a running job if the job supports stopping. +*/ + void stop() throws Exception; + + /** +* Adds a Runnable to this JobClient to be called +* when the client is shut down. Runnables are called +* in the order they are added. --- End diff -- Ay! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89136864 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java --- @@ -911,6 +912,24 @@ public JobExecutionResult execute() throws Exception { public abstract JobExecutionResult execute(String jobName) throws Exception; /** +* Triggers the program execution, just like {@code execute()} but does not block. +* Instead, it returns a JobClient which can be used to interact with the running job. +* @return A JobClient for job interaction. +* @throws Exception Thrown if the program submission fails. +*/ + public JobClient executeWithControl() throws Exception { + return executeWithControl(getDefaultName()); + } + + /** +* Triggers the program execution, just like {@code execute(String jobName)} but does not block. +* Instead, it returns a JobClient which can be used to interact with the running job. +* @return A JobClient for job interaction. +* @throws Exception Thrown if the program submission fails. +*/ + public abstract JobClient executeWithControl(String jobName) throws Exception; --- End diff -- Names for new concepts are always hard. I don't think `executeAttached` is any better than `executeWithControl`. `submitJob` would introduce the job concept into the API. I think the new name should definitely start with `execute` to follow the existing convention and be easy discoverable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89136345 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java --- @@ -333,4 +354,31 @@ protected boolean isClientConnected() { return client != ActorRef.noSender(); } + public static class ClientMessage implements Serializable { + + private Object msg; + + public ClientMessage(Object msg) { + this.msg = msg; + } + + public Object getMsg() { + return msg; + } + } + + private static class ClientMessageWithSender extends ClientMessage { --- End diff -- Will add --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89136326 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java --- @@ -333,4 +354,31 @@ protected boolean isClientConnected() { return client != ActorRef.noSender(); } + public static class ClientMessage implements Serializable { + + private Object msg; + + public ClientMessage(Object msg) { + this.msg = msg; --- End diff -- Good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89136299 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorUtils.java --- @@ -101,11 +93,37 @@ public static JobListeningContext submitJob( FiniteDuration timeout, boolean sysoutLogUpdates, ClassLoader classLoader) { + return submitJob(actorSystem, + config, + leaderRetrievalService, + jobGraph, + timeout, + sysoutLogUpdates, + null, + classLoader); + } + /** +* Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be +* passed to {@code awaitJobResult} to get the result of the submission. --- End diff -- Ay! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89135846 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala --- @@ -634,6 +634,25 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { def execute(jobName: String) = javaEnv.execute(jobName) /** +* Triggers the program execution. The environment will execute all parts of +* the program that have resulted in a "sink" operation. Sink operations are +* for example printing results or forwarding them to a message queue. +* +* The program execution will be logged and displayed with a generated +* default name. +*/ + def executeWithControl() = javaEnv.executeWithControl() + + /** +* Triggers the program execution. The environment will execute all parts of +* the program that have resulted in a "sink" operation. Sink operations are +* for example printing results or forwarding them to a message queue. +* +* The program execution will be logged and displayed with the provided name. --- End diff -- Ay! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89135855 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -652,6 +652,34 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { } /** + * Triggers the program execution. The environment will execute all parts of the program that have + * resulted in a "sink" operation. Sink operations are for example printing results + * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other + * generic data sinks created with [[DataSet.output]]. + * + * The program execution will be logged and displayed with a generated default name. + * + * @return The job client of the execution to interact with the running job. + */ + def executeWithControl(): JobClient = { +javaEnv.executeWithControl() + } + + /** + * Triggers the program execution. The environment will execute all parts of the program that have + * resulted in a "sink" operation. Sink operations are for example printing results + * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other + * generic data sinks created with [[DataSet.output]]. + * + * The program execution will be logged and displayed with the given name. + * + * @return The job client of the execution to interact with the running job. --- End diff -- Ay! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r89135481 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java --- @@ -91,11 +91,9 @@ public void testFaultyAccumulator() throws Exception { try { env.execute(); fail("Should have failed."); - } catch (ProgramInvocationException e) { - Assert.assertTrue("Exception should be passed:", - e.getCause() instanceof JobExecutionException); + } catch (JobExecutionException e) { --- End diff -- `ProgramInvocationException` are thrown during execution of the user program. `JobExecutionException` is thrown while the job is executing. That makes much more sense IMHO. The API doesn't strictly define any exceptions (`throws Exception`). Thus, we're not breaking. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2732: [FLINK-4272] Create a JobClient for job control and monit...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2732 Thank you for your comments @tillrohrmann and @aljoscha. I'll make changes and get back to you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---