[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....

2015-01-19 Thread hsaputra
GitHub user hsaputra opened a pull request:

https://github.com/apache/flink/pull/324

Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala 
to CrossDataSet.scala

This PR contains changes to follow Scala style:
-) Rename coGroupDataSet.scala to CoGroupDataSet.scala, and 
crossDataSet.scala to CrossDataSet.scala
-) Move the UnfinishedCoGroupOperation class into its own Scala file

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hsaputra/flink rename_coGroupDataSet_filename

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit 85d0dbfb506b954c53ece5ff8f825df5fbde1ed8
Author: Henry Saputra henry.sapu...@gmail.com
Date:   2015-01-19T22:52:30Z

Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala 
to CrossDataSet.scala

commit fa9f37c189e397458df4afd89af4a0025373ec84
Author: Henry Saputra henry.sapu...@gmail.com
Date:   2015-01-19T23:29:51Z

Move the UnfinishedCoGroupOperation class into its own Scala file.

The UnfinishedCoGroupOperation does not relate closely to CoGroupOperation
via sealed modifier so per Scala style guide [1] I propose to move it to
separate file.

[1] http://docs.scala-lang.org/style/files.html




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1376] [runtime] Add proper shared slot ...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/318#discussion_r23195339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class represents a shared slot. A shared slot can have multiple
+ * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within 
itself. This allows to
+ * schedule multiple tasks simultaneously, enabling Flink's streaming 
capabilities.
+ *
+ * IMPORTANT: This class contains no synchronization. Thus, the caller has 
to guarantee proper
+ * synchronization. In the current implementation, all concurrently 
modifying operations are
+ * passed through a {@link SlotSharingGroupAssignment} object which is 
responsible for
+ * synchronization.
+ *
+ */
+public class SharedSlot extends Slot {
+
+   private final SlotSharingGroupAssignment assignmentGroup;
+
+   private final SetSlot subSlots;
+
+   public SharedSlot(JobID jobID, Instance instance, int slotNumber,
+   SlotSharingGroupAssignment 
assignmentGroup, SharedSlot parent,
+   AbstractID groupID) {
+   super(jobID, instance, slotNumber, parent, groupID);
+
+   this.assignmentGroup = assignmentGroup;
+   this.subSlots = new HashSetSlot();
+   }
+
+   public SetSlot getSubSlots() {
+   return subSlots;
+   }
+
+   /**
+* Removes the simple slot from the {@link 
org.apache.flink.runtime.instance.SharedSlot}. Should
+* only be called through the
+* {@link 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} 
attribute
+* assignmnetGroup.
+*
+* @param slot slot to be removed from the set of sub slots.
+* @return Number of remaining sub slots
+*/
+   public int freeSubSlot(Slot slot){
+   if(!subSlots.remove(slot)){
+   throw new IllegalArgumentException(Wrong shared slot 
for sub slot.);
+   }
+
+   return subSlots.size();
+   }
+
+   @Override
+   public int getNumberLeaves() {
+   int result = 0;
+
+   for(Slot slot: subSlots){
+   result += slot.getNumberLeaves();
+   }
+
+   return result;
+   }
+
+   @Override
+   public void cancel() {
+   // Guarantee that the operation is only executed once
+   if (markCancelled()) {
+   assignmentGroup.releaseSharedSlot(this);
+   }
+   }
+
+   /**
+* Release this shared slot. In order to do this:
+*
+* 1. Cancel and release all sub slots atomically with respect to the 
assigned assignment group.
+* 2. Set the state of the shared slot to be cancelled.
+* 3. Dispose the shared slot (returning the slot to the instance).
+*
+* After cancelAndReleaseSubSlots, the shared slot is marked to be 
dead. This prevents further
+* sub slot creation by the scheduler.
+*/
+   @Override
+   public void releaseSlot() {
+   assignmentGroup.releaseSharedSlot(this);
+   }
+
+   /**
+* Creates a new sub slot if the slot is not dead, yet. This method 
should only be called from
+* the assignment group instance to guarantee synchronization.
+*
+* @param jID id to identify tasks which can be deployed in this sub 
slot
+* @return new sub slot if the shared slot is still alive, otherwise 
null
+*/

[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282593#comment-14282593
 ] 

Till Rohrmann commented on FLINK-1372:
--

It always depends on the location from where you log. If you log the start up 
infos from within an actor, then one should use the actor's logging object. But 
I think that we can get rid of the akka.loglevel config parameter so that the 
logging will only be controlled by the logging properties file. 

 TaskManager and JobManager do not log startup settings any more
 ---

 Key: FLINK-1372
 URL: https://issues.apache.org/jira/browse/FLINK-1372
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 In prior versions, the jobmanager and taskmanager logged a lot of startup 
 options:
  - Environment
  - ports
  - memory configuration
  - network configuration
 Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

2015-01-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/311#issuecomment-70517390
  
IMO, users should only be allowed to set semantic properties though field 
expression strings. There should be no need to implement an own 
SemanticProperty class or manually set the forwarded fields in SemanticProperty 
objects. Setting these fields is not trivial because of field indices needs to 
be flattened and types of source and target fields should be checked for 
compatibility. From my point of view, this is clearly a developer API. Whoever 
gets in touch with it should not have problems dealing with input ids or 
implementing the backwards access.

The {{hasFieldForwardInfomation()}} can be removed though. Will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1372] [runtime] Fix akka logging

2015-01-19 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/321

[FLINK-1372] [runtime] Fix akka logging

Fixes logging settings. The logging is now exclusively controlled by the 
logging properties provided to the system and thus removes akka.loglevel config 
parameter.

The JobManager and TaskManager now log their startup settings properly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixAkkaLogging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #321


commit 94d96cb6574ada07296319f19b05283055f3029b
Author: Till Rohrmann trohrm...@apache.org
Date:   2015-01-19T15:13:52Z

[FLINK-1372] [runtime] Fixes logging settings. The logging is now 
exclusively controlled by the logging properties provided to the system. 
Removes akka.loglevel config parameter.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-1296) Add support for very large record for sorting

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1296:
--
Fix Version/s: (was: 0.8)
   0.9

 Add support for very large record for sorting
 -

 Key: FLINK-1296
 URL: https://issues.apache.org/jira/browse/FLINK-1296
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 0.8
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 0.9


 Currently, very large records (multiple hundreds of megabytes) can break the 
 sorter if the overflow the sort buffer.
 Furthermore, if a merge is attempted of those records, pulling multiple of 
 them concurrently into memory can break the machine memory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-937) Change the YARN Client to allocate all cluster resources, if no argument given

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-937:
-
Fix Version/s: (was: 0.8)
   0.9

 Change the YARN Client to allocate all cluster resources, if no argument given
 --

 Key: FLINK-937
 URL: https://issues.apache.org/jira/browse/FLINK-937
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Reporter: Robert Metzger
Assignee: Robert Metzger
Priority: Minor
 Fix For: 0.9


 In order to further improve the user experience, I would like to change the 
 YARN client's behavior to allocate as many cluster resources as possible, if 
 the user does not specify differently.
 The majority of users have exclusive access to the cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1278) Remove the Record special code paths

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1278:
--
Fix Version/s: (was: 0.8)
   0.9

 Remove the Record special code paths
 

 Key: FLINK-1278
 URL: https://issues.apache.org/jira/browse/FLINK-1278
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 0.8
Reporter: Stephan Ewen
Assignee: Kostas Tzoumas
 Fix For: 0.9


 There are some legacy Record code paths in the runtime, which are often 
 forgotten to be kept in sync and cause errors if people actually use records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1297:
--
Fix Version/s: (was: 0.8)
   0.9

 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-938) Change start-cluster.sh script so that users don't have to configure the JobManager address

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-938:
-
Fix Version/s: (was: 0.8)
   0.9

 Change start-cluster.sh script so that users don't have to configure the 
 JobManager address
 ---

 Key: FLINK-938
 URL: https://issues.apache.org/jira/browse/FLINK-938
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Robert Metzger
Assignee: Mingliang Qi
Priority: Minor
 Fix For: 0.9


 To improve the user experience, Flink should not require users to configure 
 the JobManager's address on a cluster.
 In combination with FLINK-934, this would allow running Flink with decent 
 performance on a cluster without setting a single configuration value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23169293
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -860,8 +910,50 @@ protected Properties getYarnProperties() throws 
IOException {
return yarnProperties;
}

-   protected Client getClient(CommandLine line, ClassLoader classLoader) 
throws IOException {
-   return new Client(getJobManagerAddress(line), 
getGlobalConfiguration(), classLoader);
+   protected Client getClient(CommandLine line, ClassLoader classLoader, 
String programName) throws IOException {
+   String jmAddrString = getJobManagerAddressString(line);
+   InetSocketAddress jobManagerAddress = null;
+   if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
+   System.out.println(YARN cluster mode detected. 
Switching Log4j output to console);
+   LogManager.getRootLogger().addAppender(new 
ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT)));
--- End diff --

Do we have to hardwire log4j into our code? That contradicts slf4j's 
purpose. Would be great if we could get rid of that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1406] update Flink compatibility notice

2015-01-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/314#issuecomment-70519951
  
Good to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1392] Add Kryo serializer for Protobuf

2015-01-19 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/322

[FLINK-1392] Add Kryo serializer for Protobuf

I've checked the added dependencies and its not overriding any versions and 
no transitive dependencies are added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink1392

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #322


commit ed629e3e23001a0761d116d8c1151a65d88501eb
Author: Robert Metzger rmetz...@apache.org
Date:   2015-01-13T09:21:29Z

[FLINK-1392] Add Kryo serializer for Protobuf




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1003] Added spread out scheduling strat...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/60


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-1003) Spread out scheduling strategy

2015-01-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-1003:


Assignee: (was: Till Rohrmann)

 Spread out scheduling strategy
 --

 Key: FLINK-1003
 URL: https://issues.apache.org/jira/browse/FLINK-1003
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann

 Currently the Flink scheduler tries to fill one instance completely before 
 the tasks are deployed to another instance. This is a good behaviour in 
 multi-user and multi-job scenarios but it wastes resources if one wants to 
 use the complete cluster. Therefore, another scheduling strategy where the 
 load among the different instances is kept balanced might be useful. This 
 spread out strategy will deploy the tasks such that the overall work is 
 equally distributed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1418) Make 'print()' output on the client command line, rather than on the task manager sysout

2015-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282360#comment-14282360
 ] 

Fabian Hueske commented on FLINK-1418:
--

I agree, printing to the TMs std-out is pretty pointless and doesn't help 
anybody and local testing will not be affected.
However, this change should come with a big warning sign in the documentation 
and JavaDocs because it might cause huge results being transferred to the 
client. 

 Make 'print()' output on the client command line, rather than on the task 
 manager sysout
 

 Key: FLINK-1418
 URL: https://issues.apache.org/jira/browse/FLINK-1418
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 0.9
Reporter: Stephan Ewen

 Right now, the {{print()}} command prints inside the data sinks where the 
 code runs. It should pull data back to the client and print it there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations

2015-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282393#comment-14282393
 ] 

Fabian Hueske commented on FLINK-1419:
--

I think it is fine to make files in the DC immutable (read only). An operator 
that wants to modify files, can create a local writable copy.

Files in the DC should only be copied once per TM and stay until the job is 
finished, IMO.
The question is, who initiates the copy process. The first task that requires 
the file? In that case, all other tasks need to recognize that the file is 
copied by another task and wait until the copying is completed.

 DistributedCache doesn't preserver files for subsequent operations
 --

 Key: FLINK-1419
 URL: https://issues.apache.org/jira/browse/FLINK-1419
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.8, 0.9
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler

 When subsequent operations want to access the same files in the DC it 
 frequently happens that the files are not created for the following operation.
 This is fairly odd, since the DC is supposed to either a) preserve files when 
 another operation kicks in within a certain time window, or b) just recreate 
 the deleted files. Both things don't happen.
 Increasing the time window had no effect.
 I'd like to use this issue as a starting point for a more general discussion 
 about the DistributedCache. 
 Currently:
 1. all files reside in a common job-specific directory
 2. are deleted during the job.
  
 One thing that was brought up about Trait 1 is that it basically forbids 
 modification of the files, concurrent access and all. Personally I'm not sure 
 if this a problem. Changing it to a task-specific place solved the issue 
 though.
 I'm more concerned about Trait #2. Besides the mentioned issue, the deletion 
 is realized with the scheduler, which adds a lot of complexity to the current 
 code. (It really is a pain to work on...) 
 If we moved the deletion to the end of the job it could be done as a clean-up 
 step in the TaskManager, With this we could reduce the DC to a 
 cacheFile(String source) method, the delete method in the TM, and throw out 
 everything else.
 Also, the current implementation implies that big files may be copied 
 multiple times. This may be undesired, depending on how big the files are.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1165] No createCollectionsEnvironment i...

2015-01-19 Thread ajaybhat
GitHub user ajaybhat opened a pull request:

https://github.com/apache/flink/pull/320

[FLINK-1165] No createCollectionsEnvironment in Java API

This commit adds a new method to ExecutionEnvironment to create a
CollectionEnvironment, and applies the method to cases where a
CollectionEnvironment() may be needed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ajaybhat/flink FLINK-1165

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #320


commit baace5236340831789cddffaa75865f3c5dc3bb3
Author: Ajay Bhat ajayb...@air-watch.com
Date:   2015-01-19T10:45:24Z

[FLINK-1165] No createCollectionsEnvironment in Java API

This commit adds a new method to ExecutionEnvironment to create a
CollectionEnvironment, and applies the method to cases where a
CollectionEnvironment() may be needed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23169866
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Class handling the command line interface to the YARN session.
+ */
+public class FlinkYarnSessionCli {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnSessionCli.class);
+
+   // Constants   
-
+
+   private static final String CONFIG_FILE_NAME = flink-conf.yaml;
+   public static final String CONFIG_FILE_LOGBACK_NAME = logback.xml;
+   public static final String CONFIG_FILE_LOG4J_NAME = log4j.properties;
+
+
+   private static final int CLIENT_POLLING_INTERVALL = 3;
+
+
+   // Command Line argument options 
-
+   // the prefix transformation is used by the CliFrontend static 
constructor.
+   private final Option QUERY;
+   // --- or ---
+   private final Option QUEUE;
+   private final Option SHIP_PATH;
+   private final Option FLINK_JAR;
+   private final Option JM_MEMORY;
+   private final Option TM_MEMORY;
+   private final Option CONTAINER;
+   private final Option SLOTS;
+
+   /**
+* Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+*  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+*/
+   private final Option DYNAMIC_PROPERTIES;
+
+   private AbstractFlinkYarnCluster yarnCluster = null;
+
+   public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+   QUERY = new Option(shortPrefix + q, longPrefix + query, 
false, Display available YARN resources (memory, cores));
+   QUEUE = new Option(shortPrefix + qu, longPrefix + queue, 
true, Specify YARN queue.);
+   SHIP_PATH = new Option(shortPrefix + t, longPrefix + ship, 
true, Ship files in the specified directory (t for transfer));
+   FLINK_JAR = new Option(shortPrefix + j, longPrefix + jar, 
true, Path to Flink jar file);
+   JM_MEMORY = new Option(shortPrefix + jm, longPrefix + 
jobManagerMemory, true, Memory for JobManager Container [in MB]);
+   TM_MEMORY = new Option(shortPrefix + tm, longPrefix + 
taskManagerMemory, true, Memory per TaskManager Container [in MB]);
+   CONTAINER = new Option(shortPrefix + n, longPrefix + 
container, true, Number of YARN container to allocate (=Number of Task 
Managers));
+   SLOTS = new Option(shortPrefix + s, longPrefix + slots, 
true, Number of slots per TaskManager);
+   DYNAMIC_PROPERTIES = new Option(shortPrefix + D, true, 
Dynamic 

[jira] [Commented] (FLINK-1400) In local mode, the default TaskManager won't listen on the data port.

2015-01-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282690#comment-14282690
 ] 

Stephan Ewen commented on FLINK-1400:
-

Good point. The Webfrontend could write {{-1}} or {{N/A}} for the data port. If 
you want, open a separate issue for that and if you want to make a patch, let 
us know.

 In local mode, the default TaskManager won't listen on the data port.
 -

 Key: FLINK-1400
 URL: https://issues.apache.org/jira/browse/FLINK-1400
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.9
 Environment: Ubuntu 14.04 LTS
Reporter: Sergey Dudoladov
Priority: Minor

  The Task Manager automatically started by the Job Manager (JobManager.scala, 
 appr. line  470)  in the local mode does not listen on the dataport. 
 To reproduce:
 1) Start Flink via ./start-local.sh
 2) Look up the data port number on locahost:8081 - Task Managers tab
 3) sudo netstat -taupen | grep dataport_number 
  
 Or  start the second Task Manager and run  Flink with the degree of 
 parallelism 2 (assuming one slot per Task Manager)
 4) ./flink run -p 2 ...
 Task Managers started via ./taskmanager.sh work fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23171500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---
@@ -86,7 +86,7 @@ public static InetAddress 
resolveAddress(InetSocketAddress jobManagerAddress) th
case SLOW_CONNECT:
boolean correct = 
tryToConnect(i, jobManagerAddress, strategy.getTimeout());
if (correct) {
-   
LOG.info(Determined  + i +  as the TaskTracker's own IP address);
+   
LOG.info(Determined  + i +  as the machine's own IP address);
--- End diff --

Same here with the logging string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14282739#comment-14282739
 ] 

Till Rohrmann commented on FLINK-1372:
--

I think, it should work now with my latest PR.

 TaskManager and JobManager do not log startup settings any more
 ---

 Key: FLINK-1372
 URL: https://issues.apache.org/jira/browse/FLINK-1372
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 In prior versions, the jobmanager and taskmanager logged a lot of startup 
 options:
  - Environment
  - ports
  - memory configuration
  - network configuration
 Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)