[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/324 Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to CrossDataSet.scala This PR contains changes to follow Scala style: -) Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to CrossDataSet.scala -) Move the UnfinishedCoGroupOperation class into its own Scala file You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink rename_coGroupDataSet_filename Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit 85d0dbfb506b954c53ece5ff8f825df5fbde1ed8 Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-01-19T22:52:30Z Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to CrossDataSet.scala commit fa9f37c189e397458df4afd89af4a0025373ec84 Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-01-19T23:29:51Z Move the UnfinishedCoGroupOperation class into its own Scala file. The UnfinishedCoGroupOperation does not relate closely to CoGroupOperation via sealed modifier so per Scala style guide [1] I propose to move it to separate file. [1] http://docs.scala-lang.org/style/files.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1376] [runtime] Add proper shared slot ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/318#discussion_r23195339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.AbstractID; +import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment; + +import java.util.HashSet; +import java.util.Set; + +/** + * This class represents a shared slot. A shared slot can have multiple + * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within itself. This allows to + * schedule multiple tasks simultaneously, enabling Flink's streaming capabilities. + * + * IMPORTANT: This class contains no synchronization. Thus, the caller has to guarantee proper + * synchronization. In the current implementation, all concurrently modifying operations are + * passed through a {@link SlotSharingGroupAssignment} object which is responsible for + * synchronization. + * + */ +public class SharedSlot extends Slot { + + private final SlotSharingGroupAssignment assignmentGroup; + + private final SetSlot subSlots; + + public SharedSlot(JobID jobID, Instance instance, int slotNumber, + SlotSharingGroupAssignment assignmentGroup, SharedSlot parent, + AbstractID groupID) { + super(jobID, instance, slotNumber, parent, groupID); + + this.assignmentGroup = assignmentGroup; + this.subSlots = new HashSetSlot(); + } + + public SetSlot getSubSlots() { + return subSlots; + } + + /** +* Removes the simple slot from the {@link org.apache.flink.runtime.instance.SharedSlot}. Should +* only be called through the +* {@link org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} attribute +* assignmnetGroup. +* +* @param slot slot to be removed from the set of sub slots. +* @return Number of remaining sub slots +*/ + public int freeSubSlot(Slot slot){ + if(!subSlots.remove(slot)){ + throw new IllegalArgumentException(Wrong shared slot for sub slot.); + } + + return subSlots.size(); + } + + @Override + public int getNumberLeaves() { + int result = 0; + + for(Slot slot: subSlots){ + result += slot.getNumberLeaves(); + } + + return result; + } + + @Override + public void cancel() { + // Guarantee that the operation is only executed once + if (markCancelled()) { + assignmentGroup.releaseSharedSlot(this); + } + } + + /** +* Release this shared slot. In order to do this: +* +* 1. Cancel and release all sub slots atomically with respect to the assigned assignment group. +* 2. Set the state of the shared slot to be cancelled. +* 3. Dispose the shared slot (returning the slot to the instance). +* +* After cancelAndReleaseSubSlots, the shared slot is marked to be dead. This prevents further +* sub slot creation by the scheduler. +*/ + @Override + public void releaseSlot() { + assignmentGroup.releaseSharedSlot(this); + } + + /** +* Creates a new sub slot if the slot is not dead, yet. This method should only be called from +* the assignment group instance to guarantee synchronization. +* +* @param jID id to identify tasks which can be deployed in this sub slot +* @return new sub slot if the shared slot is still alive, otherwise null +*/
[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more
[ https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282593#comment-14282593 ] Till Rohrmann commented on FLINK-1372: -- It always depends on the location from where you log. If you log the start up infos from within an actor, then one should use the actor's logging object. But I think that we can get rid of the akka.loglevel config parameter so that the logging will only be controlled by the logging properties file. TaskManager and JobManager do not log startup settings any more --- Key: FLINK-1372 URL: https://issues.apache.org/jira/browse/FLINK-1372 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 In prior versions, the jobmanager and taskmanager logged a lot of startup options: - Environment - ports - memory configuration - network configuration Currently, they log very little. We should add the logging back in. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/311#issuecomment-70517390 IMO, users should only be allowed to set semantic properties though field expression strings. There should be no need to implement an own SemanticProperty class or manually set the forwarded fields in SemanticProperty objects. Setting these fields is not trivial because of field indices needs to be flattened and types of source and target fields should be checked for compatibility. From my point of view, this is clearly a developer API. Whoever gets in touch with it should not have problems dealing with input ids or implementing the backwards access. The {{hasFieldForwardInfomation()}} can be removed though. Will do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1372] [runtime] Fix akka logging
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/321 [FLINK-1372] [runtime] Fix akka logging Fixes logging settings. The logging is now exclusively controlled by the logging properties provided to the system and thus removes akka.loglevel config parameter. The JobManager and TaskManager now log their startup settings properly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixAkkaLogging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #321 commit 94d96cb6574ada07296319f19b05283055f3029b Author: Till Rohrmann trohrm...@apache.org Date: 2015-01-19T15:13:52Z [FLINK-1372] [runtime] Fixes logging settings. The logging is now exclusively controlled by the logging properties provided to the system. Removes akka.loglevel config parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-1296) Add support for very large record for sorting
[ https://issues.apache.org/jira/browse/FLINK-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-1296: -- Fix Version/s: (was: 0.8) 0.9 Add support for very large record for sorting - Key: FLINK-1296 URL: https://issues.apache.org/jira/browse/FLINK-1296 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 0.8 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9 Currently, very large records (multiple hundreds of megabytes) can break the sorter if the overflow the sort buffer. Furthermore, if a merge is attempted of those records, pulling multiple of them concurrently into memory can break the machine memory. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-937) Change the YARN Client to allocate all cluster resources, if no argument given
[ https://issues.apache.org/jira/browse/FLINK-937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-937: - Fix Version/s: (was: 0.8) 0.9 Change the YARN Client to allocate all cluster resources, if no argument given -- Key: FLINK-937 URL: https://issues.apache.org/jira/browse/FLINK-937 Project: Flink Issue Type: Improvement Components: YARN Client Reporter: Robert Metzger Assignee: Robert Metzger Priority: Minor Fix For: 0.9 In order to further improve the user experience, I would like to change the YARN client's behavior to allocate as many cluster resources as possible, if the user does not specify differently. The majority of users have exclusive access to the cluster. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1278) Remove the Record special code paths
[ https://issues.apache.org/jira/browse/FLINK-1278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-1278: -- Fix Version/s: (was: 0.8) 0.9 Remove the Record special code paths Key: FLINK-1278 URL: https://issues.apache.org/jira/browse/FLINK-1278 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 0.8 Reporter: Stephan Ewen Assignee: Kostas Tzoumas Fix For: 0.9 There are some legacy Record code paths in the runtime, which are often forgotten to be kept in sync and cause errors if people actually use records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1297) Add support for tracking statistics of intermediate results
[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-1297: -- Fix Version/s: (was: 0.8) 0.9 Add support for tracking statistics of intermediate results --- Key: FLINK-1297 URL: https://issues.apache.org/jira/browse/FLINK-1297 Project: Flink Issue Type: Improvement Components: Distributed Runtime Reporter: Alexander Alexandrov Assignee: Alexander Alexandrov Fix For: 0.9 Original Estimate: 1,008h Remaining Estimate: 1,008h One of the major problems related to the optimizer at the moment is the lack of proper statistics. With the introduction of staged execution, it is possible to instrument the runtime code with a statistics facility that collects the required information for optimizing the next execution stage. I would therefore like to contribute code that can be used to gather basic statistics for the (intermediate) result of dataflows (e.g. min, max, count, count distinct) and make them available to the job manager. Before I start, I would like to hear some feedback form the other users. In particular, to handle skew (e.g. on grouping) it might be good to have some sort of detailed sketch about the key distribution of an intermediate result. I am not sure whether a simple histogram is the most effective way to go. Maybe somebody would propose another lightweight sketch that provides better accuracy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-938) Change start-cluster.sh script so that users don't have to configure the JobManager address
[ https://issues.apache.org/jira/browse/FLINK-938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-938: - Fix Version/s: (was: 0.8) 0.9 Change start-cluster.sh script so that users don't have to configure the JobManager address --- Key: FLINK-938 URL: https://issues.apache.org/jira/browse/FLINK-938 Project: Flink Issue Type: Improvement Components: Build System Reporter: Robert Metzger Assignee: Mingliang Qi Priority: Minor Fix For: 0.9 To improve the user experience, Flink should not require users to configure the JobManager's address on a cluster. In combination with FLINK-934, this would allow running Flink with decent performance on a cluster without setting a single configuration value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/292#discussion_r23169293 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -860,8 +910,50 @@ protected Properties getYarnProperties() throws IOException { return yarnProperties; } - protected Client getClient(CommandLine line, ClassLoader classLoader) throws IOException { - return new Client(getJobManagerAddress(line), getGlobalConfiguration(), classLoader); + protected Client getClient(CommandLine line, ClassLoader classLoader, String programName) throws IOException { + String jmAddrString = getJobManagerAddressString(line); + InetSocketAddress jobManagerAddress = null; + if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) { + System.out.println(YARN cluster mode detected. Switching Log4j output to console); + LogManager.getRootLogger().addAppender(new ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT))); --- End diff -- Do we have to hardwire log4j into our code? That contradicts slf4j's purpose. Would be great if we could get rid of that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1406] update Flink compatibility notice
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/314#issuecomment-70519951 Good to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1392] Add Kryo serializer for Protobuf
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/322 [FLINK-1392] Add Kryo serializer for Protobuf I've checked the added dependencies and its not overriding any versions and no transitive dependencies are added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink1392 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/322.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #322 commit ed629e3e23001a0761d116d8c1151a65d88501eb Author: Robert Metzger rmetz...@apache.org Date: 2015-01-13T09:21:29Z [FLINK-1392] Add Kryo serializer for Protobuf --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1003] Added spread out scheduling strat...
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/60 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-1003) Spread out scheduling strategy
[ https://issues.apache.org/jira/browse/FLINK-1003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-1003: Assignee: (was: Till Rohrmann) Spread out scheduling strategy -- Key: FLINK-1003 URL: https://issues.apache.org/jira/browse/FLINK-1003 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Currently the Flink scheduler tries to fill one instance completely before the tasks are deployed to another instance. This is a good behaviour in multi-user and multi-job scenarios but it wastes resources if one wants to use the complete cluster. Therefore, another scheduling strategy where the load among the different instances is kept balanced might be useful. This spread out strategy will deploy the tasks such that the overall work is equally distributed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1418) Make 'print()' output on the client command line, rather than on the task manager sysout
[ https://issues.apache.org/jira/browse/FLINK-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282360#comment-14282360 ] Fabian Hueske commented on FLINK-1418: -- I agree, printing to the TMs std-out is pretty pointless and doesn't help anybody and local testing will not be affected. However, this change should come with a big warning sign in the documentation and JavaDocs because it might cause huge results being transferred to the client. Make 'print()' output on the client command line, rather than on the task manager sysout Key: FLINK-1418 URL: https://issues.apache.org/jira/browse/FLINK-1418 Project: Flink Issue Type: Improvement Components: Java API Affects Versions: 0.9 Reporter: Stephan Ewen Right now, the {{print()}} command prints inside the data sinks where the code runs. It should pull data back to the client and print it there. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations
[ https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282393#comment-14282393 ] Fabian Hueske commented on FLINK-1419: -- I think it is fine to make files in the DC immutable (read only). An operator that wants to modify files, can create a local writable copy. Files in the DC should only be copied once per TM and stay until the job is finished, IMO. The question is, who initiates the copy process. The first task that requires the file? In that case, all other tasks need to recognize that the file is copied by another task and wait until the copying is completed. DistributedCache doesn't preserver files for subsequent operations -- Key: FLINK-1419 URL: https://issues.apache.org/jira/browse/FLINK-1419 Project: Flink Issue Type: Bug Affects Versions: 0.8, 0.9 Reporter: Chesnay Schepler Assignee: Chesnay Schepler When subsequent operations want to access the same files in the DC it frequently happens that the files are not created for the following operation. This is fairly odd, since the DC is supposed to either a) preserve files when another operation kicks in within a certain time window, or b) just recreate the deleted files. Both things don't happen. Increasing the time window had no effect. I'd like to use this issue as a starting point for a more general discussion about the DistributedCache. Currently: 1. all files reside in a common job-specific directory 2. are deleted during the job. One thing that was brought up about Trait 1 is that it basically forbids modification of the files, concurrent access and all. Personally I'm not sure if this a problem. Changing it to a task-specific place solved the issue though. I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is realized with the scheduler, which adds a lot of complexity to the current code. (It really is a pain to work on...) If we moved the deletion to the end of the job it could be done as a clean-up step in the TaskManager, With this we could reduce the DC to a cacheFile(String source) method, the delete method in the TM, and throw out everything else. Also, the current implementation implies that big files may be copied multiple times. This may be undesired, depending on how big the files are. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1165] No createCollectionsEnvironment i...
GitHub user ajaybhat opened a pull request: https://github.com/apache/flink/pull/320 [FLINK-1165] No createCollectionsEnvironment in Java API This commit adds a new method to ExecutionEnvironment to create a CollectionEnvironment, and applies the method to cases where a CollectionEnvironment() may be needed You can merge this pull request into a Git repository by running: $ git pull https://github.com/ajaybhat/flink FLINK-1165 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/320.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #320 commit baace5236340831789cddffaa75865f3c5dc3bb3 Author: Ajay Bhat ajayb...@air-watch.com Date: 2015-01-19T10:45:24Z [FLINK-1165] No createCollectionsEnvironment in Java API This commit adds a new method to ExecutionEnvironment to create a CollectionEnvironment, and applies the method to cases where a CollectionEnvironment() may be needed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/292#discussion_r23169866 --- Diff: flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; +import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * Class handling the command line interface to the YARN session. + */ +public class FlinkYarnSessionCli { + private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class); + + // Constants - + + private static final String CONFIG_FILE_NAME = flink-conf.yaml; + public static final String CONFIG_FILE_LOGBACK_NAME = logback.xml; + public static final String CONFIG_FILE_LOG4J_NAME = log4j.properties; + + + private static final int CLIENT_POLLING_INTERVALL = 3; + + + // Command Line argument options - + // the prefix transformation is used by the CliFrontend static constructor. + private final Option QUERY; + // --- or --- + private final Option QUEUE; + private final Option SHIP_PATH; + private final Option FLINK_JAR; + private final Option JM_MEMORY; + private final Option TM_MEMORY; + private final Option CONTAINER; + private final Option SLOTS; + + /** +* Dynamic properties allow the user to specify additional configuration values with -D, such as +* -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368 +*/ + private final Option DYNAMIC_PROPERTIES; + + private AbstractFlinkYarnCluster yarnCluster = null; + + public FlinkYarnSessionCli(String shortPrefix, String longPrefix) { + QUERY = new Option(shortPrefix + q, longPrefix + query, false, Display available YARN resources (memory, cores)); + QUEUE = new Option(shortPrefix + qu, longPrefix + queue, true, Specify YARN queue.); + SHIP_PATH = new Option(shortPrefix + t, longPrefix + ship, true, Ship files in the specified directory (t for transfer)); + FLINK_JAR = new Option(shortPrefix + j, longPrefix + jar, true, Path to Flink jar file); + JM_MEMORY = new Option(shortPrefix + jm, longPrefix + jobManagerMemory, true, Memory for JobManager Container [in MB]); + TM_MEMORY = new Option(shortPrefix + tm, longPrefix + taskManagerMemory, true, Memory per TaskManager Container [in MB]); + CONTAINER = new Option(shortPrefix + n, longPrefix + container, true, Number of YARN container to allocate (=Number of Task Managers)); + SLOTS = new Option(shortPrefix + s, longPrefix + slots, true, Number of slots per TaskManager); + DYNAMIC_PROPERTIES = new Option(shortPrefix + D, true, Dynamic
[jira] [Commented] (FLINK-1400) In local mode, the default TaskManager won't listen on the data port.
[ https://issues.apache.org/jira/browse/FLINK-1400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282690#comment-14282690 ] Stephan Ewen commented on FLINK-1400: - Good point. The Webfrontend could write {{-1}} or {{N/A}} for the data port. If you want, open a separate issue for that and if you want to make a patch, let us know. In local mode, the default TaskManager won't listen on the data port. - Key: FLINK-1400 URL: https://issues.apache.org/jira/browse/FLINK-1400 Project: Flink Issue Type: Bug Affects Versions: 0.9 Environment: Ubuntu 14.04 LTS Reporter: Sergey Dudoladov Priority: Minor The Task Manager automatically started by the Job Manager (JobManager.scala, appr. line 470) in the local mode does not listen on the dataport. To reproduce: 1) Start Flink via ./start-local.sh 2) Look up the data port number on locahost:8081 - Task Managers tab 3) sudo netstat -taupen | grep dataport_number Or start the second Task Manager and run Flink with the degree of parallelism 2 (assuming one slot per Task Manager) 4) ./flink run -p 2 ... Task Managers started via ./taskmanager.sh work fine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/292#discussion_r23171500 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java --- @@ -86,7 +86,7 @@ public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) th case SLOW_CONNECT: boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout()); if (correct) { - LOG.info(Determined + i + as the TaskTracker's own IP address); + LOG.info(Determined + i + as the machine's own IP address); --- End diff -- Same here with the logging string. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more
[ https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282739#comment-14282739 ] Till Rohrmann commented on FLINK-1372: -- I think, it should work now with my latest PR. TaskManager and JobManager do not log startup settings any more --- Key: FLINK-1372 URL: https://issues.apache.org/jira/browse/FLINK-1372 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 In prior versions, the jobmanager and taskmanager logged a lot of startup options: - Environment - ports - memory configuration - network configuration Currently, they log very little. We should add the logging back in. -- This message was sent by Atlassian JIRA (v6.3.4#6332)