[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....

2015-01-19 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/324#issuecomment-70600130
  
FYI @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1376] [runtime] Add proper shared slot ...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/318#discussion_r23195339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class represents a shared slot. A shared slot can have multiple
+ * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within 
itself. This allows to
+ * schedule multiple tasks simultaneously, enabling Flink's streaming 
capabilities.
+ *
+ * IMPORTANT: This class contains no synchronization. Thus, the caller has 
to guarantee proper
+ * synchronization. In the current implementation, all concurrently 
modifying operations are
+ * passed through a {@link SlotSharingGroupAssignment} object which is 
responsible for
+ * synchronization.
+ *
+ */
+public class SharedSlot extends Slot {
+
+   private final SlotSharingGroupAssignment assignmentGroup;
+
+   private final Set subSlots;
+
+   public SharedSlot(JobID jobID, Instance instance, int slotNumber,
+   SlotSharingGroupAssignment 
assignmentGroup, SharedSlot parent,
+   AbstractID groupID) {
+   super(jobID, instance, slotNumber, parent, groupID);
+
+   this.assignmentGroup = assignmentGroup;
+   this.subSlots = new HashSet();
+   }
+
+   public Set getSubSlots() {
+   return subSlots;
+   }
+
+   /**
+* Removes the simple slot from the {@link 
org.apache.flink.runtime.instance.SharedSlot}. Should
+* only be called through the
+* {@link 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} 
attribute
+* assignmnetGroup.
+*
+* @param slot slot to be removed from the set of sub slots.
+* @return Number of remaining sub slots
+*/
+   public int freeSubSlot(Slot slot){
+   if(!subSlots.remove(slot)){
+   throw new IllegalArgumentException("Wrong shared slot 
for sub slot.");
+   }
+
+   return subSlots.size();
+   }
+
+   @Override
+   public int getNumberLeaves() {
+   int result = 0;
+
+   for(Slot slot: subSlots){
+   result += slot.getNumberLeaves();
+   }
+
+   return result;
+   }
+
+   @Override
+   public void cancel() {
+   // Guarantee that the operation is only executed once
+   if (markCancelled()) {
+   assignmentGroup.releaseSharedSlot(this);
+   }
+   }
+
+   /**
+* Release this shared slot. In order to do this:
+*
+* 1. Cancel and release all sub slots atomically with respect to the 
assigned assignment group.
+* 2. Set the state of the shared slot to be cancelled.
+* 3. Dispose the shared slot (returning the slot to the instance).
+*
+* After cancelAndReleaseSubSlots, the shared slot is marked to be 
dead. This prevents further
+* sub slot creation by the scheduler.
+*/
+   @Override
+   public void releaseSlot() {
+   assignmentGroup.releaseSharedSlot(this);
+   }
+
+   /**
+* Creates a new sub slot if the slot is not dead, yet. This method 
should only be called from
+* the assignment group instance to guarantee synchronization.
+*
+* @param jID id to identify tasks which can be deployed in this sub 
slot
+* @return new sub slot if the shared slot is still alive, otherwise 
null
+*/
+   pub

[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....

2015-01-19 Thread hsaputra
GitHub user hsaputra opened a pull request:

https://github.com/apache/flink/pull/324

Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala 
to CrossDataSet.scala

This PR contains changes to follow Scala style:
-) Rename coGroupDataSet.scala to CoGroupDataSet.scala, and 
crossDataSet.scala to CrossDataSet.scala
-) Move the UnfinishedCoGroupOperation class into its own Scala file

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hsaputra/flink rename_coGroupDataSet_filename

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit 85d0dbfb506b954c53ece5ff8f825df5fbde1ed8
Author: Henry Saputra 
Date:   2015-01-19T22:52:30Z

Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala 
to CrossDataSet.scala

commit fa9f37c189e397458df4afd89af4a0025373ec84
Author: Henry Saputra 
Date:   2015-01-19T23:29:51Z

Move the UnfinishedCoGroupOperation class into its own Scala file.

The UnfinishedCoGroupOperation does not relate closely to CoGroupOperation
via sealed modifier so per Scala style guide [1] I propose to move it to
separate file.

[1] http://docs.scala-lang.org/style/files.html




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1372] [runtime] Fix akka logging

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/321#discussion_r23188009
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---
@@ -204,12 +192,16 @@ object AkkaUtils {
   |
   |  loggers = ["akka.event.slf4j.Slf4jLogger"]
   |  logger-startup-timeout = 30s
-  |  loglevel = "WARNING"
-  |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+  |  loglevel = "DEBUG"
--- End diff --

Well actually, it is not optimal, since now all debug messages are 
generated but not printed. I'll fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/292#issuecomment-70559819
  
Thank you for reviewing the code. I hope I have time tomorrow to address 
all remarks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-1420) Small cleanup on code after 0.8 release

2015-01-19 Thread Henry Saputra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Saputra resolved FLINK-1420.
--
Resolution: Fixed

> Small cleanup on code after 0.8 release
> ---
>
> Key: FLINK-1420
> URL: https://issues.apache.org/jira/browse/FLINK-1420
> Project: Flink
>  Issue Type: Improvement
>Reporter: Henry Saputra
>Assignee: Henry Saputra
>Priority: Minor
> Fix For: 0.8.1, master
>
>
> This issue track PR https://github.com/apache/flink/pull/302 for master and 
> 0.8.1 release



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-1420 Small cleanup on code after branch ...

2015-01-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/302


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming

2015-01-19 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282950#comment-14282950
 ] 

Robert Metzger commented on FLINK-1421:
---

+10. Very nice that you started working on this. I really like to see Flink 
being integrated with other projects!

> Implement a SAMOA Adapter for Flink Streaming
> -
>
> Key: FLINK-1421
> URL: https://issues.apache.org/jira/browse/FLINK-1421
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Yahoo's Samoa is an experimental incremental machine learning library that 
> builds on an abstract compositional data streaming model to write streaming 
> algorithms. The task is to provide an adapter from SAMOA topologies to 
> Flink-streaming job graphs in order to support Flink as a backend engine for 
> SAMOA tasks.
> A statup guide can be viewed here :
> https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub
> The main working branch of the adapter :
> https://github.com/senorcarbone/samoa/tree/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming

2015-01-19 Thread Paris Carbone (JIRA)
Paris Carbone created FLINK-1421:


 Summary: Implement a SAMOA Adapter for Flink Streaming
 Key: FLINK-1421
 URL: https://issues.apache.org/jira/browse/FLINK-1421
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Reporter: Paris Carbone
Assignee: Paris Carbone


Yahoo's Samoa is an experimental incremental machine learning library that 
builds on an abstract compositional data streaming model to write streaming 
algorithms. The task is to provide an adapter from SAMOA topologies to 
Flink-streaming job graphs in order to support Flink as a backend engine for 
SAMOA tasks.

An statup guide can be viewed here :
https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub

The main working branch of the adapter :
https://github.com/senorcarbone/samoa/tree/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming

2015-01-19 Thread Paris Carbone (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paris Carbone updated FLINK-1421:
-
Description: 
Yahoo's Samoa is an experimental incremental machine learning library that 
builds on an abstract compositional data streaming model to write streaming 
algorithms. The task is to provide an adapter from SAMOA topologies to 
Flink-streaming job graphs in order to support Flink as a backend engine for 
SAMOA tasks.

A statup guide can be viewed here :
https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub

The main working branch of the adapter :
https://github.com/senorcarbone/samoa/tree/flink

  was:
Yahoo's Samoa is an experimental incremental machine learning library that 
builds on an abstract compositional data streaming model to write streaming 
algorithms. The task is to provide an adapter from SAMOA topologies to 
Flink-streaming job graphs in order to support Flink as a backend engine for 
SAMOA tasks.

An statup guide can be viewed here :
https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub

The main working branch of the adapter :
https://github.com/senorcarbone/samoa/tree/flink


> Implement a SAMOA Adapter for Flink Streaming
> -
>
> Key: FLINK-1421
> URL: https://issues.apache.org/jira/browse/FLINK-1421
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Yahoo's Samoa is an experimental incremental machine learning library that 
> builds on an abstract compositional data streaming model to write streaming 
> algorithms. The task is to provide an adapter from SAMOA topologies to 
> Flink-streaming job graphs in order to support Flink as a backend engine for 
> SAMOA tasks.
> A statup guide can be viewed here :
> https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub
> The main working branch of the adapter :
> https://github.com/senorcarbone/samoa/tree/flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1420) Small cleanup on code after 0.8 release

2015-01-19 Thread Henry Saputra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Saputra updated FLINK-1420:
-
Issue Type: Improvement  (was: Bug)

> Small cleanup on code after 0.8 release
> ---
>
> Key: FLINK-1420
> URL: https://issues.apache.org/jira/browse/FLINK-1420
> Project: Flink
>  Issue Type: Improvement
>Reporter: Henry Saputra
>Assignee: Henry Saputra
>Priority: Minor
> Fix For: 0.8.1, master
>
>
> This issue track PR https://github.com/apache/flink/pull/302 for master and 
> 0.8.1 release



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1420) Small cleanup on code after 0.8 release

2015-01-19 Thread Henry Saputra (JIRA)
Henry Saputra created FLINK-1420:


 Summary: Small cleanup on code after 0.8 release
 Key: FLINK-1420
 URL: https://issues.apache.org/jira/browse/FLINK-1420
 Project: Flink
  Issue Type: Bug
Reporter: Henry Saputra
Assignee: Henry Saputra
Priority: Minor
 Fix For: master, 0.8.1


This issue track PR https://github.com/apache/flink/pull/302 for master and 
0.8.1 release



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1372] [runtime] Fix akka logging

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/321#discussion_r23177318
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---
@@ -204,12 +192,16 @@ object AkkaUtils {
   |
   |  loggers = ["akka.event.slf4j.Slf4jLogger"]
   |  logger-startup-timeout = 30s
-  |  loglevel = "WARNING"
-  |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+  |  loglevel = "DEBUG"
--- End diff --

Yeah that is right. But it will only log debug messages if also the user 
has specified this level in his logging properties file. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implement the convenience methods count and co...

2015-01-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/210#discussion_r23176867
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java ---
@@ -23,8 +23,8 @@
 import static org.junit.Assert.fail;
 
 import org.junit.Test;
-
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.AbstractID;
--- End diff --

Nevermind, I forgot to push the last changes I made..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Update incubator-flink name in the merge pull ...

2015-01-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/313


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/292#issuecomment-70536571
  
As far as I can tell without running tests on the cluster it looks good to 
me. Only some minor remarks as in-code comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23176494
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---
@@ -31,6 +31,7 @@ trait YarnTaskManager extends ActorLogMessages {
 
   def receiveYarnMessages: Receive = {
 case StopYarnSession(status) => {
+  log.info(s"Stopping YARN TaskManager with final application status 
$status")
--- End diff --

{}-placeholder


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23176437
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---
@@ -153,15 +159,24 @@ object ApplicationMaster{
 output.close()
   }
 
-  def startJobManager(currDir: String): (ActorSystem, ActorRef) = {
+  def startJobManager(currDir: String, hostname: String, 
dynamicPropertiesEncodedString: String):
+(ActorSystem, ActorRef) = {
 LOG.info("Start job manager for yarn")
-val pathToConfig = s"$currDir/$MODIFIED_CONF_FILE"
-val args = Array[String]("--configDir", pathToConfig)
+val args = Array[String]("--configDir", currDir)
 
-LOG.info(s"Config path: ${pathToConfig}.")
-val (hostname, port, configuration, _) = JobManager.parseArgs(args)
+LOG.info(s"Config path: ${currDir}.")
+val (_, _, configuration, _) = JobManager.parseArgs(args)
+
+// add dynamic properties to JobManager configuration.
+val dynamicProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+import scala.collection.JavaConverters._
+for(property <- dynamicProperties.asScala){
+  configuration.setString(property.f0, property.f1)
+}
+GlobalConfiguration.getConfiguration.addAll(configuration) // make 
part of globalConf.
--- End diff --

GlobalConfiguration is bad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implement the convenience methods count and co...

2015-01-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/210#discussion_r23176327
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java ---
@@ -23,8 +23,8 @@
 import static org.junit.Assert.fail;
 
 import org.junit.Test;
-
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.AbstractID;
--- End diff --

As far as I see, `AbstractIDTest` is already part of `flink-core`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-655) Add support for both single and set of broadcast values

2015-01-19 Thread Henry Saputra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14281923#comment-14281923
 ] 

Henry Saputra edited comment on FLINK-655 at 1/19/15 6:00 PM:
--

So, the option are:
1. Use this JIRA as vehicle to add new APIs to add set or individual broadcast 
variable. This will break API compatibility for 0.9 release.
2. Use this JIRA to deprecate use of existing one and promote new ones of 
withBroadcastValue and getBroadcastValue for just naming consistency.
3. Do nothing for now and close this one as will not fix.

Actually since both methods are called from different contexts, one is from 
RuntimeContext and the other from Operator, different names should not annoy 
customers that much.


was (Author: hsaputra):
So, the option are:
1. Use this JIRA as vehicle to add new APIs to add set or individual broadcast 
variable. This will break API compatibility for 0.9 release.
2. Use this JIRA to deprecate use of existing one and promote new ones of 
withBroadcastVariable for just naming consistency.
3. Do nothing for now and close this one as will not fix.

Actually since both methods are called from different contexts, one is from 
RuntimeContext and the other from Operator, different names should not annoy 
customers that much.

> Add support for both single and set of broadcast values
> ---
>
> Key: FLINK-655
> URL: https://issues.apache.org/jira/browse/FLINK-655
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Ufuk Celebi
>Assignee: Henry Saputra
>  Labels: breaking-api, github-import, starter
> Fix For: pre-apache
>
>
> To broadcast a data set you have to do the following:
> ```java
> lorem.map(new MyMapper()).withBroadcastSet(toBroadcast, "toBroadcastName")
> ```
> In the operator you call:
> ```java
> getRuntimeContext().getBroadcastVariable("toBroadcastName")
> ```
> I propose to have both method names consistent, e.g.
>   - `withBroadcastVariable(DataSet, String)`, or
>   - `getBroadcastSet(String)`.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/655
> Created by: [uce|https://github.com/uce]
> Labels: enhancement, java api, user satisfaction, 
> Created at: Wed Apr 02 16:29:08 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-655) Add support for both single and set of broadcast values

2015-01-19 Thread Henry Saputra (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14281923#comment-14281923
 ] 

Henry Saputra edited comment on FLINK-655 at 1/19/15 6:00 PM:
--

So, the option are:
1. Use this JIRA as vehicle to add new APIs to add set or individual broadcast 
variable. This will break API compatibility for 0.9 release.
2. Use this JIRA to deprecate use of existing one and promote new ones of 
withBroadcastVariable for just naming consistency.
3. Do nothing for now and close this one as will not fix.

Actually since both methods are called from different contexts, one is from 
RuntimeContext and the other from Operator, different names should not annoy 
customers that much.


was (Author: hsaputra):
So, the option are:
1. Use this JIRA as vehicle to add new APIs to add set or individual broadcast 
variable. This will break API compatibility for 0.9 release.
2. Use this JIRA to deprecate use of existing one and promote new ones of 
withBroadcastVariable and getBroadcastVariable for just naming consistency.
3. Do nothing for now and close this one as will not fix.

Actually since both methods are called from different contexts, one is from 
RuntimeContext and the other from Operator, different names should not annoy 
customers that much.

> Add support for both single and set of broadcast values
> ---
>
> Key: FLINK-655
> URL: https://issues.apache.org/jira/browse/FLINK-655
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Ufuk Celebi
>Assignee: Henry Saputra
>  Labels: breaking-api, github-import, starter
> Fix For: pre-apache
>
>
> To broadcast a data set you have to do the following:
> ```java
> lorem.map(new MyMapper()).withBroadcastSet(toBroadcast, "toBroadcastName")
> ```
> In the operator you call:
> ```java
> getRuntimeContext().getBroadcastVariable("toBroadcastName")
> ```
> I propose to have both method names consistent, e.g.
>   - `withBroadcastVariable(DataSet, String)`, or
>   - `getBroadcastSet(String)`.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/655
> Created by: [uce|https://github.com/uce]
> Labels: enhancement, java api, user satisfaction, 
> Created at: Wed Apr 02 16:29:08 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23176016
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---
@@ -41,9 +41,9 @@ object ApplicationMaster{
   val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
 
   def main(args: Array[String]): Unit ={
-val yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME)
-LOG.info(s"YARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName} " +
-  s"' setting user to execute Flink ApplicationMaster/JobManager to 
${yarnClientUsername}'")
+val yarnClientUsername = 
System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
+LOG.info(s"YARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName}" +
--- End diff --

{}-placeholder


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Implement the convenience methods count and co...

2015-01-19 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/210#issuecomment-70534961
  
I fixed the proposed changes and rebased to the current master. The changes 
are only reflected in the Java API and need to be added to the Scala API as 
well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23175611
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import static akka.pattern.Patterns.ask;
+
+import akka.actor.Props;
+import akka.util.Timeout;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Awaitable;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCluster.class);
+
+   private static final int POLLING_THREAD_INTERVAL_MS = 1000;
+
+   private YarnClient yarnClient;
+   private Thread actorRunner;
+   private Thread clientShutdownHook = new ClientShutdownHook();
+   private PollingThread pollingRunner;
+   private Configuration hadoopConfig;
+   // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
+   private Path sessionFilesDir;
+   private InetSocketAddress jobManagerAddress;
+
+   //-- Class internal fields ---
+
+   private ActorSystem actorSystem;
+   private ActorRef applicationClient;
+   private ApplicationReport intialAppReport;
+   private static FiniteDuration akkaDuration = Duration.apply(5, 
TimeUnit.SECONDS);
+   private static Timeout akkaTimeout = 
Timeout.durationToTimeout(akkaDuration);
+
+   public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId,
+   Configuration 
hadoopConfig, Path sessionFilesDir) throws IOException, YarnException {
+   this.yarnClient = yarnClient;
+   this.hadoopConfig = hadoopConfig;
+   this.sessionFilesDir = sessionFilesDir;
+
+   // get one application report manually
+   intialAppReport = yarnClient.getApplicationReport(appId);
+   String jobManagerHost = intialAppReport.getHost();
+   int jobManagerPort = intialAppReport.getRpcPort();
+   this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
+
+   // start actor system
+   LOG.info("Start actor system.");
+   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+   actorSystem = 
YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, 
GlobalConfiguration.getConfiguration()

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23175580
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import static akka.pattern.Patterns.ask;
+
+import akka.actor.Props;
+import akka.util.Timeout;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Awaitable;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCluster.class);
+
+   private static final int POLLING_THREAD_INTERVAL_MS = 1000;
+
+   private YarnClient yarnClient;
+   private Thread actorRunner;
+   private Thread clientShutdownHook = new ClientShutdownHook();
+   private PollingThread pollingRunner;
+   private Configuration hadoopConfig;
+   // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
+   private Path sessionFilesDir;
+   private InetSocketAddress jobManagerAddress;
+
+   //-- Class internal fields ---
+
+   private ActorSystem actorSystem;
+   private ActorRef applicationClient;
+   private ApplicationReport intialAppReport;
+   private static FiniteDuration akkaDuration = Duration.apply(5, 
TimeUnit.SECONDS);
+   private static Timeout akkaTimeout = 
Timeout.durationToTimeout(akkaDuration);
+
+   public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId,
+   Configuration 
hadoopConfig, Path sessionFilesDir) throws IOException, YarnException {
+   this.yarnClient = yarnClient;
+   this.hadoopConfig = hadoopConfig;
+   this.sessionFilesDir = sessionFilesDir;
+
+   // get one application report manually
+   intialAppReport = yarnClient.getApplicationReport(appId);
+   String jobManagerHost = intialAppReport.getHost();
+   int jobManagerPort = intialAppReport.getRpcPort();
+   this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
+
+   // start actor system
+   LOG.info("Start actor system.");
+   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+   actorSystem = 
YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, 
GlobalConfiguration.getConfiguration()

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23175486
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import static akka.pattern.Patterns.ask;
+
+import akka.actor.Props;
+import akka.util.Timeout;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Awaitable;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCluster.class);
+
+   private static final int POLLING_THREAD_INTERVAL_MS = 1000;
+
+   private YarnClient yarnClient;
+   private Thread actorRunner;
+   private Thread clientShutdownHook = new ClientShutdownHook();
+   private PollingThread pollingRunner;
+   private Configuration hadoopConfig;
+   // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
+   private Path sessionFilesDir;
+   private InetSocketAddress jobManagerAddress;
+
+   //-- Class internal fields ---
+
+   private ActorSystem actorSystem;
+   private ActorRef applicationClient;
+   private ApplicationReport intialAppReport;
+   private static FiniteDuration akkaDuration = Duration.apply(5, 
TimeUnit.SECONDS);
+   private static Timeout akkaTimeout = 
Timeout.durationToTimeout(akkaDuration);
+
+   public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId,
+   Configuration 
hadoopConfig, Path sessionFilesDir) throws IOException, YarnException {
+   this.yarnClient = yarnClient;
+   this.hadoopConfig = hadoopConfig;
+   this.sessionFilesDir = sessionFilesDir;
+
+   // get one application report manually
+   intialAppReport = yarnClient.getApplicationReport(appId);
+   String jobManagerHost = intialAppReport.getHost();
+   int jobManagerPort = intialAppReport.getRpcPort();
+   this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
+
+   // start actor system
+   LOG.info("Start actor system.");
+   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+   actorSystem = 
YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, 
GlobalConfiguration.getConfiguration()

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23175364
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import static akka.pattern.Patterns.ask;
+
+import akka.actor.Props;
+import akka.util.Timeout;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Awaitable;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCluster.class);
+
+   private static final int POLLING_THREAD_INTERVAL_MS = 1000;
+
+   private YarnClient yarnClient;
+   private Thread actorRunner;
+   private Thread clientShutdownHook = new ClientShutdownHook();
+   private PollingThread pollingRunner;
+   private Configuration hadoopConfig;
+   // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
+   private Path sessionFilesDir;
+   private InetSocketAddress jobManagerAddress;
+
+   //-- Class internal fields ---
+
+   private ActorSystem actorSystem;
+   private ActorRef applicationClient;
+   private ApplicationReport intialAppReport;
+   private static FiniteDuration akkaDuration = Duration.apply(5, 
TimeUnit.SECONDS);
+   private static Timeout akkaTimeout = 
Timeout.durationToTimeout(akkaDuration);
+
+   public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId,
+   Configuration 
hadoopConfig, Path sessionFilesDir) throws IOException, YarnException {
+   this.yarnClient = yarnClient;
+   this.hadoopConfig = hadoopConfig;
+   this.sessionFilesDir = sessionFilesDir;
+
+   // get one application report manually
+   intialAppReport = yarnClient.getApplicationReport(appId);
+   String jobManagerHost = intialAppReport.getHost();
+   int jobManagerPort = intialAppReport.getRpcPort();
+   this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
+
+   // start actor system
+   LOG.info("Start actor system.");
+   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+   actorSystem = 
YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, 
GlobalConfiguration.getConfiguration()

[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282739#comment-14282739
 ] 

Till Rohrmann commented on FLINK-1372:
--

I think, it should work now with my latest PR.

> TaskManager and JobManager do not log startup settings any more
> ---
>
> Key: FLINK-1372
> URL: https://issues.apache.org/jira/browse/FLINK-1372
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 0.9
>
>
> In prior versions, the jobmanager and taskmanager logged a lot of startup 
> options:
>  - Environment
>  - ports
>  - memory configuration
>  - network configuration
> Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1303) HadoopInputFormat does not work with Scala API

2015-01-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282734#comment-14282734
 ] 

Stephan Ewen commented on FLINK-1303:
-

Ah, I have seen another user telling me about this. Good to know that this is 
the cause.

The broader fix would be to have java tuple support in the Scala API then?

> HadoopInputFormat does not work with Scala API
> --
>
> Key: FLINK-1303
> URL: https://issues.apache.org/jira/browse/FLINK-1303
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.9
>
>
> It fails because the HadoopInputFormat uses the Flink Tuple2 type. For this, 
> type extraction fails at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282729#comment-14282729
 ] 

Stephan Ewen commented on FLINK-1372:
-

The logging of startup options would happen in large parts in the companion 
object's main method, and then in the actor constructor, before any 
messages/parallelism. Can we have a simple slf4j logger in the companion object 
and pass it to the constructor?

> TaskManager and JobManager do not log startup settings any more
> ---
>
> Key: FLINK-1372
> URL: https://issues.apache.org/jira/browse/FLINK-1372
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 0.9
>
>
> In prior versions, the jobmanager and taskmanager logged a lot of startup 
> options:
>  - Environment
>  - ports
>  - memory configuration
>  - network configuration
> Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23173266
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * All classes in this package contain code taken from
+ * 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar 
file downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class FlinkYarnClient extends AbstractFlinkYarnClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+   /**
+* Constants,
+* all starting with ENV_ are used as environment variables to pass 
values from the Client
+* to the Application Master.
+*/
+   public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+   public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+   public final static String ENV_APP_ID = "_APP_ID";
+   public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
+   public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+   public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+   public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+   public static final String ENV_SLOTS = "_SLOTS";
+   public static final Strin

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23173179
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * All classes in this package contain code taken from
+ * 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar 
file downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class FlinkYarnClient extends AbstractFlinkYarnClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+   /**
+* Constants,
+* all starting with ENV_ are used as environment variables to pass 
values from the Client
+* to the Application Master.
+*/
+   public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+   public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+   public final static String ENV_APP_ID = "_APP_ID";
+   public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
+   public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+   public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+   public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+   public static final String ENV_SLOTS = "_SLOTS";
+   public static final Strin

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23173072
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * All classes in this package contain code taken from
+ * 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar 
file downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class FlinkYarnClient extends AbstractFlinkYarnClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+   /**
+* Constants,
+* all starting with ENV_ are used as environment variables to pass 
values from the Client
+* to the Application Master.
+*/
+   public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+   public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+   public final static String ENV_APP_ID = "_APP_ID";
+   public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
+   public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+   public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+   public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+   public static final String ENV_SLOTS = "_SLOTS";
+   public static final Strin

[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-19 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/304#issuecomment-70527831
  
My point is that we cannot make all types anyone would ever use work out of 
the box. Any sample of types that work out of the box is incomplete and sort of 
arbitrary and we have to be careful not to trash the Kryo instances by 
overloading them with pre-registered serializers. Registering default 
serializers is actually a bit inefficient (array list with move operations). 

I thought that having a util that you can call to register serializers for 
a certain lib/topic would make things transparent, easy for the user, and keep 
kryo "slim" initially.

BTW: We may actually start thinking about Kryo pooling, if we want to add 
vastly more default serializers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172930
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * All classes in this package contain code taken from
+ * 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar 
file downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class FlinkYarnClient extends AbstractFlinkYarnClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+   /**
+* Constants,
+* all starting with ENV_ are used as environment variables to pass 
values from the Client
+* to the Application Master.
+*/
+   public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+   public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+   public final static String ENV_APP_ID = "_APP_ID";
+   public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
+   public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+   public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+   public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+   public static final String ENV_SLOTS = "_SLOTS";
+   public static final Strin

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172849
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * All classes in this package contain code taken from
+ * 
https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * 
https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar 
file downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class FlinkYarnClient extends AbstractFlinkYarnClient {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnClient.class);
+
+   /**
+* Constants,
+* all starting with ENV_ are used as environment variables to pass 
values from the Client
+* to the Application Master.
+*/
+   public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+   public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+   public final static String ENV_APP_ID = "_APP_ID";
+   public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
+   public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+   public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+   public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+   public static final String ENV_SLOTS = "_SLOTS";
+   public static final Strin

[jira] [Commented] (FLINK-1418) Make 'print()' output on the client command line, rather than on the task manager sysout

2015-01-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282706#comment-14282706
 ] 

Stephan Ewen commented on FLINK-1418:
-

It also breaks existing programs, because {{print()}} will immediately trigger 
execution. If you have another {{env.execute()}} call after that, then the call 
will complain that no sinks are defined.

> Make 'print()' output on the client command line, rather than on the task 
> manager sysout
> 
>
> Key: FLINK-1418
> URL: https://issues.apache.org/jira/browse/FLINK-1418
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>
> Right now, the {{print()}} command prints inside the data sinks where the 
> code runs. It should pull data back to the client and print it there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172541
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+   private final static PrintStream originalStdout = System.out;
+   private final static PrintStream originalStderr = System.err;
+
+
+   // Temp directory which is deleted after the unit test.
+   private static TemporaryFolder tmp = new TemporaryFolder();
+
+   protected static MiniYARNCluster yarnCluster = null;
+
+   protected static File flinkUberjar;
+
+   protected static final Configuration yarnConfiguration;
+   static {
+   yarnConfiguration = new YarnConfiguration();
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
+   
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
+   
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
+   yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+   
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+   
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+   yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+   // so we have to change the number of cores for testing.
+   }
+
+   // This code is taken from: http://stackoverflow.com/a/7201825/568695
+   // it changes the environment variables of this JVM. Use only for 
testing purposes!
+   private static void setEnv(Map newenv) {
+   try {
+   Class processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+   Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+   theEnvironmentField.setAccessible(true)

[jira] [Commented] (FLINK-1098) flatArray() operator that converts arrays to elements

2015-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282699#comment-14282699
 ] 

Fabian Hueske commented on FLINK-1098:
--

Timo proposed to add a {{colectEach()}} method to {{Collector}}. 
That would not change {{DataSet}} and not require a new function interface or 
operator.

> flatArray() operator that converts arrays to elements
> -
>
> Key: FLINK-1098
> URL: https://issues.apache.org/jira/browse/FLINK-1098
> Project: Flink
>  Issue Type: New Feature
>Reporter: Timo Walther
>Priority: Minor
>
> It would be great to have an operator that converts e.g. from String[] to 
> String. Actually, it is just a flatMap over the elements of an array.
> A typical use case is a WordCount where we then could write:
> {code}
> text
> .map((line) -> line.toLowerCase().split("\\W+"))
> .flatArray()
> .map((word) -> new Tuple2(word, 1))
> .groupBy(0)
> .sum(1);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172407
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+   private final static PrintStream originalStdout = System.out;
+   private final static PrintStream originalStderr = System.err;
+
+
+   // Temp directory which is deleted after the unit test.
+   private static TemporaryFolder tmp = new TemporaryFolder();
+
+   protected static MiniYARNCluster yarnCluster = null;
+
+   protected static File flinkUberjar;
+
+   protected static final Configuration yarnConfiguration;
+   static {
+   yarnConfiguration = new YarnConfiguration();
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
+   
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
+   
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
+   yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+   
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+   
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+   yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+   // so we have to change the number of cores for testing.
+   }
+
+   // This code is taken from: http://stackoverflow.com/a/7201825/568695
+   // it changes the environment variables of this JVM. Use only for 
testing purposes!
+   private static void setEnv(Map newenv) {
+   try {
+   Class processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+   Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+   theEnvironmentField.setAccessible(t

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172490
  
--- Diff: flink-yarn-tests/src/test/resources/log4j-test.properties ---
@@ -0,0 +1,25 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, file
--- End diff --

Do we want to have INFO log output in the yarn tests? Isn't it a little bit 
too verbose?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-19 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/304#issuecomment-70526481
  
Maybe I'm missing something but in my understanding if a user is using 
`DateTime` in a POJO, we would be able to serialize that efficiently with Kryo 
(without writing any classname for that).

I agree that if a user is using the `Instant` class of Jodatime its either 
not working or slow. But thats fine because most users are probably using 
`DateTime`. Those users would have to take care of finding or implementing a 
serializer for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172386
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+   private final static PrintStream originalStdout = System.out;
+   private final static PrintStream originalStderr = System.err;
+
+
+   // Temp directory which is deleted after the unit test.
+   private static TemporaryFolder tmp = new TemporaryFolder();
+
+   protected static MiniYARNCluster yarnCluster = null;
+
+   protected static File flinkUberjar;
+
+   protected static final Configuration yarnConfiguration;
+   static {
+   yarnConfiguration = new YarnConfiguration();
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
+   
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
+   
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
+   yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+   
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+   
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+   yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+   // so we have to change the number of cores for testing.
+   }
+
+   // This code is taken from: http://stackoverflow.com/a/7201825/568695
+   // it changes the environment variables of this JVM. Use only for 
testing purposes!
+   private static void setEnv(Map newenv) {
+   try {
+   Class processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+   Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+   theEnvironmentField.setAccessible(t

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172243
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+   private final static PrintStream originalStdout = System.out;
+   private final static PrintStream originalStderr = System.err;
+
+
+   // Temp directory which is deleted after the unit test.
+   private static TemporaryFolder tmp = new TemporaryFolder();
+
+   protected static MiniYARNCluster yarnCluster = null;
+
+   protected static File flinkUberjar;
+
+   protected static final Configuration yarnConfiguration;
+   static {
+   yarnConfiguration = new YarnConfiguration();
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
+   
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
+   
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
+   yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
+   
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+   
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+   
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+   yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+   // so we have to change the number of cores for testing.
+   }
+
+   // This code is taken from: http://stackoverflow.com/a/7201825/568695
+   // it changes the environment variables of this JVM. Use only for 
testing purposes!
+   private static void setEnv(Map newenv) {
+   try {
+   Class processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+   Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+   theEnvironmentField.setAccessible(t

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172158
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
--- End diff --

https://briankoberlein.com/wp-content/uploads/copypasta.jpg


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-19 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/304#issuecomment-70525444
  
JodaTime alone has about 20 classes. Not for all of them are serializers 
available, it would be quite a bit of work to add all those.

Adding a big set of common cases means registering 50+ serializers at kryo 
every time we create a kryo instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23172012
  
--- Diff: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+   private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
--- End diff --

Why retrieving a logger for YARNSessionFIFOIT.class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1372] [runtime] Fix akka logging

2015-01-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/321#discussion_r23171851
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---
@@ -204,12 +192,16 @@ object AkkaUtils {
   |
   |  loggers = ["akka.event.slf4j.Slf4jLogger"]
   |  logger-startup-timeout = 30s
-  |  loglevel = "WARNING"
-  |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+  |  loglevel = "DEBUG"
--- End diff --

So the level is now DEBUG by default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1400) In local mode, the default TaskManager won't listen on the data port.

2015-01-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282690#comment-14282690
 ] 

Stephan Ewen commented on FLINK-1400:
-

Good point. The Webfrontend could write {{-1}} or {{N/A}} for the data port. If 
you want, open a separate issue for that and if you want to make a patch, let 
us know.

> In local mode, the default TaskManager won't listen on the data port.
> -
>
> Key: FLINK-1400
> URL: https://issues.apache.org/jira/browse/FLINK-1400
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9
> Environment: Ubuntu 14.04 LTS
>Reporter: Sergey Dudoladov
>Priority: Minor
>
>  The Task Manager automatically started by the Job Manager (JobManager.scala, 
> appr. line  470)  in the local mode does not listen on the dataport. 
> To reproduce:
> 1) Start Flink via ./start-local.sh
> 2) Look up the data port number on locahost:8081 -> "Task Managers" tab
> 3) sudo netstat -taupen | grep "dataport_number "
>  
> Or  start the second Task Manager and run  Flink with the degree of 
> parallelism 2 (assuming one slot per Task Manager)
> 4) ./flink run -p 2 ...
> Task Managers started via ./taskmanager.sh work fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23171500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---
@@ -86,7 +86,7 @@ public static InetAddress 
resolveAddress(InetSocketAddress jobManagerAddress) th
case SLOW_CONNECT:
boolean correct = 
tryToConnect(i, jobManagerAddress, strategy.getTimeout());
if (correct) {
-   
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+   
LOG.info("Determined " + i + " as the machine's own IP address");
--- End diff --

Same here with the logging string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23171462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---
@@ -76,7 +76,7 @@ public static InetAddress 
resolveAddress(InetSocketAddress jobManagerAddress) th
case ADDRESS:
if 
(hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
if 
(tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
-   
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+   
LOG.info("Determined " + i + " as the machine's own IP address");
--- End diff --

Better to use Log.info("Determined {} as the machine's own IP address.", 
i). That way, the string will only be created if the log level is info or 
higher.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1391] Add support for using Avro-POJOs ...

2015-01-19 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/323

[FLINK-1391] Add support for using Avro-POJOs and Avro types with Kryo

With this change, users can use POJOs generated from Avro Schemas with 
Flink.
Either the entire POJO as a GenericType or with the POJO serializer.

I also added support for using `GenericData.Record`, which is a container 
that checks the types using a schema. Its extremely inefficient to use that. I 
guess the only realistic usecase for that is local prototyping.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink1391-rebased

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #323


commit 0e003c3c08537f03698ca122b4096d9512bfcad6
Author: Robert Metzger 
Date:   2015-01-12T20:11:09Z

[FLINK-1391] Add support for using Avro-POJOs and Avro types with Kryo




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1098) flatArray() operator that converts arrays to elements

2015-01-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282679#comment-14282679
 ] 

Stephan Ewen commented on FLINK-1098:
-

Can we do this with a special flatMapFunction, rather than with extra functions 
on the data set? Something like
{code}
public class FlatMapArrayFunction extends RichMapFunction  {

  public void flatMap(I in , Collector out) {
O[] result = flatMapArray(in);
for (O element : result) {
  out.collect(element );
}
  }

  public abstract O[] flatMapArray(I in) throws Exception;
}
{code}

This could go into the {{lib}} of common utility function.

For a more functional approach: I think in functional programming, the function 
is called {{flatten()}} and should work in {{DataSet>}} and 
{{DataSet}}.

In Scala, this is nice and easy, we can add an implicit conversion to an array- 
or list dataset, with a guard that the conversion only happens if the type is 
list or array.

For Java, it is not that nice. Adding the special functions is again bloating 
the API, so we need again make a careful tradeoff.

> flatArray() operator that converts arrays to elements
> -
>
> Key: FLINK-1098
> URL: https://issues.apache.org/jira/browse/FLINK-1098
> Project: Flink
>  Issue Type: New Feature
>Reporter: Timo Walther
>Priority: Minor
>
> It would be great to have an operator that converts e.g. from String[] to 
> String. Actually, it is just a flatMap over the elements of an array.
> A typical use case is a WordCount where we then could write:
> {code}
> text
> .map((line) -> line.toLowerCase().split("\\W+"))
> .flatArray()
> .map((word) -> new Tuple2(word, 1))
> .groupBy(0)
> .sum(1);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23170666
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -860,8 +910,50 @@ protected Properties getYarnProperties() throws 
IOException {
return yarnProperties;
}

-   protected Client getClient(CommandLine line, ClassLoader classLoader) 
throws IOException {
-   return new Client(getJobManagerAddress(line), 
getGlobalConfiguration(), classLoader);
+   protected Client getClient(CommandLine line, ClassLoader classLoader, 
String programName) throws IOException {
+   String jmAddrString = getJobManagerAddressString(line);
+   InetSocketAddress jobManagerAddress = null;
+   if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
+   System.out.println("YARN cluster mode detected. 
Switching Log4j output to console");
+   LogManager.getRootLogger().addAppender(new 
ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT)));
--- End diff --

Okay. Let me see if I can come up with a workaround.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23170694
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Class handling the command line interface to the YARN session.
+ */
+public class FlinkYarnSessionCli {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnSessionCli.class);
+
+   // Constants   
-
+
+   private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+   public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
+   public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
+
+
+   private static final int CLIENT_POLLING_INTERVALL = 3;
+
+
+   // Command Line argument options 
-
+   // the prefix transformation is used by the CliFrontend static 
constructor.
+   private final Option QUERY;
+   // --- or ---
+   private final Option QUEUE;
+   private final Option SHIP_PATH;
+   private final Option FLINK_JAR;
+   private final Option JM_MEMORY;
+   private final Option TM_MEMORY;
+   private final Option CONTAINER;
+   private final Option SLOTS;
+
+   /**
+* Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+*  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+*/
+   private final Option DYNAMIC_PROPERTIES;
+
+   private AbstractFlinkYarnCluster yarnCluster = null;
+
+   public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+   QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
+   QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
+   SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
+   FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
+   JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + 
"jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+   TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + 
"taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
+   CONTAINER = new Option(shortPrefix + "n", longPrefix + 
"container", true, "Number of YARN container to allocate (=Number of Task 
Managers)");
+   SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", 
true, "Number of slots per TaskManager");
+   DYNAMIC_PROP

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-01-19 Thread Alexander Alexandrov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282660#comment-14282660
 ] 

Alexander Alexandrov commented on FLINK-1297:
-

We have a new master student (Fridtjof Sander), who is going to work with me on 
that issue.

I've created a feature branch where we will push the WIP on that issue to make 
it available for discussion:

https://github.com/stratosphere/flink/tree/FLINK-1297

As a first step, Fridtjof is going to extend the TaskConfig so we can configure 
what statistics to track for each produced result. Once this is done and fixed, 
we will continue with a prototype implementation of the logic that tracks these 
statistics in the TaskManagers as Ufuk suggested.

We will need expert guidance on that so be ready to answer a lot of questions 
:) Hopefully a first prototype of that functionality will be ready for 0.9.

> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.9
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-01-19 Thread Alexander Alexandrov (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Alexandrov updated FLINK-1297:

Due Date: 15/Mar/15  (was: 31/Jan/15)

> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.9
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23169866
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Class handling the command line interface to the YARN session.
+ */
+public class FlinkYarnSessionCli {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnSessionCli.class);
+
+   // Constants   
-
+
+   private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+   public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
+   public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
+
+
+   private static final int CLIENT_POLLING_INTERVALL = 3;
+
+
+   // Command Line argument options 
-
+   // the prefix transformation is used by the CliFrontend static 
constructor.
+   private final Option QUERY;
+   // --- or ---
+   private final Option QUEUE;
+   private final Option SHIP_PATH;
+   private final Option FLINK_JAR;
+   private final Option JM_MEMORY;
+   private final Option TM_MEMORY;
+   private final Option CONTAINER;
+   private final Option SLOTS;
+
+   /**
+* Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+*  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+*/
+   private final Option DYNAMIC_PROPERTIES;
+
+   private AbstractFlinkYarnCluster yarnCluster = null;
+
+   public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+   QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
+   QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
+   SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
+   FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
+   JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + 
"jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+   TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + 
"taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
+   CONTAINER = new Option(shortPrefix + "n", longPrefix + 
"container", true, "Number of YARN container to allocate (=Number of Task 
Managers)");
+   SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", 
true, "Number of slots per TaskManager");
+   DYNAMIC_

[GitHub] flink pull request: [FLINK-1392] Add Kryo serializer for Protobuf

2015-01-19 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/322

[FLINK-1392] Add Kryo serializer for Protobuf

I've checked the added dependencies and its not overriding any versions and 
no transitive dependencies are added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink1392

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #322


commit ed629e3e23001a0761d116d8c1151a65d88501eb
Author: Robert Metzger 
Date:   2015-01-13T09:21:29Z

[FLINK-1392] Add Kryo serializer for Protobuf




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1406] update Flink compatibility notice

2015-01-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/314#issuecomment-70519951
  
Good to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23169293
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -860,8 +910,50 @@ protected Properties getYarnProperties() throws 
IOException {
return yarnProperties;
}

-   protected Client getClient(CommandLine line, ClassLoader classLoader) 
throws IOException {
-   return new Client(getJobManagerAddress(line), 
getGlobalConfiguration(), classLoader);
+   protected Client getClient(CommandLine line, ClassLoader classLoader, 
String programName) throws IOException {
+   String jmAddrString = getJobManagerAddressString(line);
+   InetSocketAddress jobManagerAddress = null;
+   if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
+   System.out.println("YARN cluster mode detected. 
Switching Log4j output to console");
+   LogManager.getRootLogger().addAppender(new 
ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT)));
--- End diff --

Do we have to hardwire log4j into our code? That contradicts slf4j's 
purpose. Would be great if we could get rid of that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

2015-01-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/311#issuecomment-70517390
  
IMO, users should only be allowed to set semantic properties though field 
expression strings. There should be no need to implement an own 
SemanticProperty class or manually set the forwarded fields in SemanticProperty 
objects. Setting these fields is not trivial because of field indices needs to 
be flattened and types of source and target fields should be checked for 
compatibility. From my point of view, this is clearly a developer API. Whoever 
gets in touch with it should not have problems dealing with input ids or 
implementing the backwards access.

The {{hasFieldForwardInfomation()}} can be removed though. Will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1372] [runtime] Fix akka logging

2015-01-19 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/321

[FLINK-1372] [runtime] Fix akka logging

Fixes logging settings. The logging is now exclusively controlled by the 
logging properties provided to the system and thus removes akka.loglevel config 
parameter.

The JobManager and TaskManager now log their startup settings properly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixAkkaLogging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #321


commit 94d96cb6574ada07296319f19b05283055f3029b
Author: Till Rohrmann 
Date:   2015-01-19T15:13:52Z

[FLINK-1372] [runtime] Fixes logging settings. The logging is now 
exclusively controlled by the logging properties provided to the system. 
Removes akka.loglevel config parameter.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-1296) Add support for very large record for sorting

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1296:
--
Fix Version/s: (was: 0.8)
   0.9

> Add support for very large record for sorting
> -
>
> Key: FLINK-1296
> URL: https://issues.apache.org/jira/browse/FLINK-1296
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 0.8
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.9
>
>
> Currently, very large records (multiple hundreds of megabytes) can break the 
> sorter if the overflow the sort buffer.
> Furthermore, if a merge is attempted of those records, pulling multiple of 
> them concurrently into memory can break the machine memory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-938) Change start-cluster.sh script so that users don't have to configure the JobManager address

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-938:
-
Fix Version/s: (was: 0.8)
   0.9

> Change start-cluster.sh script so that users don't have to configure the 
> JobManager address
> ---
>
> Key: FLINK-938
> URL: https://issues.apache.org/jira/browse/FLINK-938
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Mingliang Qi
>Priority: Minor
> Fix For: 0.9
>
>
> To improve the user experience, Flink should not require users to configure 
> the JobManager's address on a cluster.
> In combination with FLINK-934, this would allow running Flink with decent 
> performance on a cluster without setting a single configuration value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-967) Make intermediate results a first-class citizen in the JobGraph

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-967:
-
Fix Version/s: (was: 0.8)
   0.9

> Make intermediate results a first-class citizen in the JobGraph
> ---
>
> Key: FLINK-967
> URL: https://issues.apache.org/jira/browse/FLINK-967
> Project: Flink
>  Issue Type: New Feature
>  Components: JobManager, TaskManager
>Affects Versions: 0.6-incubating
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.9
>
>
> In order to add incremental plan rollout to the system, we need to make 
> intermediate results a first-class citizen in the job graph and scheduler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1303) HadoopInputFormat does not work with Scala API

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1303:
--
Fix Version/s: (was: 0.8)
   0.9

> HadoopInputFormat does not work with Scala API
> --
>
> Key: FLINK-1303
> URL: https://issues.apache.org/jira/browse/FLINK-1303
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.9
>
>
> It fails because the HadoopInputFormat uses the Flink Tuple2 type. For this, 
> type extraction fails at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1297:
--
Fix Version/s: (was: 0.8)
   0.9

> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.9
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1240) We cannot use sortGroup on a global reduce

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1240:
--
Fix Version/s: (was: 0.8)
   0.9

> We cannot use sortGroup on a global reduce
> --
>
> Key: FLINK-1240
> URL: https://issues.apache.org/jira/browse/FLINK-1240
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Priority: Minor
> Fix For: 0.9
>
>
> This is only an API problem, I hope.
> I also know, that this is potentially a very bad idea because everything must 
> be sorted on one node. In some cases, such as sorted first-n this would make 
> sense, though, since there we use a combiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-937) Change the YARN Client to allocate all cluster resources, if no argument given

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-937:
-
Fix Version/s: (was: 0.8)
   0.9

> Change the YARN Client to allocate all cluster resources, if no argument given
> --
>
> Key: FLINK-937
> URL: https://issues.apache.org/jira/browse/FLINK-937
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN Client
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Minor
> Fix For: 0.9
>
>
> In order to further improve the user experience, I would like to change the 
> YARN client's behavior to allocate as many cluster resources as possible, if 
> the user does not specify differently.
> The majority of users have exclusive access to the cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-987) Extend TypeSerializers and -Comparators to work directly on Memory Segments

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-987:
-
Fix Version/s: (was: 0.8)
   0.9

> Extend TypeSerializers and -Comparators to work directly on Memory Segments
> ---
>
> Key: FLINK-987
> URL: https://issues.apache.org/jira/browse/FLINK-987
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 0.6-incubating
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
> Fix For: 0.9
>
>
> As per discussion with [~till.rohrmann], [~uce], [~aljoscha], we suggest to 
> change the way that the TypeSerialzers/Comparators and 
> DataInputViews/DataOutputViews work.
> The goal is to allow more flexibility in the construction on the binary 
> representation of data types, and to allow partial deserialization of 
> individual fields. Both is currently prohibited by the fact that the 
> abstraction of the memory (into which the data goes) is a stream abstraction 
> ({{DataInputView}}, {{DataOutputView}}).
> An idea is to offer a random-access buffer like view for construction and 
> random-access deserialization, as well as various methods to copy elements in 
> a binary fashion between such buffers and streams.
> A possible set of methods for the {{TypeSerializer}} could be:
> {code}
> long serialize(T record, TargetBuffer buffer);
>   
> T deserialize(T reuse, SourceBuffer source);
>   
> void ensureBufferSufficientlyFilled(SourceBuffer source);
>   
>  X deserializeField(X reuse, int logicalPos, SourceBuffer buffer);
>   
> int getOffsetForField(int logicalPos, int offset, SourceBuffer buffer);
>   
> void copy(DataInputView in, TargetBuffer buffer);
>   
> void copy(SourceBuffer buffer,, DataOutputView out);
>   
> void copy(DataInputView source, DataOutputView target);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1278) Remove the Record special code paths

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1278:
--
Fix Version/s: (was: 0.8)
   0.9

> Remove the Record special code paths
> 
>
> Key: FLINK-1278
> URL: https://issues.apache.org/jira/browse/FLINK-1278
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 0.8
>Reporter: Stephan Ewen
>Assignee: Kostas Tzoumas
> Fix For: 0.9
>
>
> There are some legacy Record code paths in the runtime, which are often 
> forgotten to be kept in sync and cause errors if people actually use records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1129) The Plan Visualizer Cuts of the Lower Part of Certain Operators

2015-01-19 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-1129:
--
Fix Version/s: (was: 0.8)
   0.9

> The Plan Visualizer Cuts of the Lower Part of Certain Operators
> ---
>
> Key: FLINK-1129
> URL: https://issues.apache.org/jira/browse/FLINK-1129
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.9
>
> Attachments: screenshot-1.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1303) HadoopInputFormat does not work with Scala API

2015-01-19 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282600#comment-14282600
 ] 

Robert Metzger commented on FLINK-1303:
---

Whats the status of this issue?

> HadoopInputFormat does not work with Scala API
> --
>
> Key: FLINK-1303
> URL: https://issues.apache.org/jira/browse/FLINK-1303
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.8
>
>
> It fails because the HadoopInputFormat uses the Flink Tuple2 type. For this, 
> type extraction fails at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282593#comment-14282593
 ] 

Till Rohrmann commented on FLINK-1372:
--

It always depends on the location from where you log. If you log the start up 
infos from within an actor, then one should use the actor's logging object. But 
I think that we can get rid of the akka.loglevel config parameter so that the 
logging will only be controlled by the logging properties file. 

> TaskManager and JobManager do not log startup settings any more
> ---
>
> Key: FLINK-1372
> URL: https://issues.apache.org/jira/browse/FLINK-1372
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 0.9
>
>
> In prior versions, the jobmanager and taskmanager logged a lot of startup 
> options:
>  - Environment
>  - ports
>  - memory configuration
>  - network configuration
> Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282585#comment-14282585
 ] 

Ufuk Celebi commented on FLINK-1372:


Thanks for the explanation and looking into it. :-) Do the points apply for 
start up log infos (e.g. JVM warning or the points brought up by Stephan in 
this issue) as well?

> TaskManager and JobManager do not log startup settings any more
> ---
>
> Key: FLINK-1372
> URL: https://issues.apache.org/jira/browse/FLINK-1372
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 0.9
>
>
> In prior versions, the jobmanager and taskmanager logged a lot of startup 
> options:
>  - Environment
>  - ports
>  - memory configuration
>  - network configuration
> Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282577#comment-14282577
 ] 

Till Rohrmann commented on FLINK-1372:
--

I don't think that it is good to directly use the logger of the companion 
object in an actor scenario. First and most of all, you have synchronous 
logging calls which in case of multiple actors which access the same logger 
will block each other. Furthermore, you don't get the information which actor 
actually made the call, since you only see the class name and the calling 
thread. If you have multiple actors of the same class in one JVM, then it all 
looks the same to you.

By using Akka's LoggingAdapter we get asynchronous logging with mapped 
diagnostic context information such as the calling thread, the calling actor 
and other information. Admittedly, the MDC information cannot be processed by 
log4j but still we have the advantage of non-blocking logging. By the way, the 
MDC support is the reason why I wanted to switch to Logback as the default 
logging backend.

I try to fix the issue right now.

> TaskManager and JobManager do not log startup settings any more
> ---
>
> Key: FLINK-1372
> URL: https://issues.apache.org/jira/browse/FLINK-1372
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 0.9
>
>
> In prior versions, the jobmanager and taskmanager logged a lot of startup 
> options:
>  - Environment
>  - ports
>  - memory configuration
>  - network configuration
> Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

2015-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/311#discussion_r23163896
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java
 ---
@@ -56,218 +56,135 @@
 

public DualInputSemanticProperties() {
-   init();
+   this.fieldMapping1 = new HashMap();
+   this.fieldMapping2 = new HashMap();
+   this.readFields1 = null;
+   this.readFields2 = null;
}
-   
-   /**
-* Adds, to the existing information, a field that is forwarded directly
-* from the source record(s) in the first input to the destination
-* record(s).
-* 
-* @param sourceField the position in the source record(s) from the 
first input
-* @param destinationField the position in the destination record(s)
-*/
-   public void addForwardedField1(int sourceField, int destinationField) {
-   FieldSet old = this.forwardedFields1.get(sourceField);
-   if (old == null) {
-   old = FieldSet.EMPTY_SET;
+
+   @Override
+   public FieldSet getTargetFields(int input, int sourceField) {
+
+   if (input != 0 && input != 1) {
+   throw new IndexOutOfBoundsException();
+   } else if (input == 0) {
+
+   return fieldMapping1.get(sourceField);
+   } else {
+   return fieldMapping2.get(sourceField);
}
-   
-   FieldSet fs = old.addField(destinationField);
-   this.forwardedFields1.put(sourceField, fs);
}
-   
-   /**
-* Adds, to the existing information, a field that is forwarded directly
-* from the source record(s) in the first input to multiple fields in
-* the destination record(s).
-* 
-* @param sourceField the position in the source record(s)
-* @param destinationFields the position in the destination record(s)
-*/
-   public void addForwardedField1(int sourceField, FieldSet 
destinationFields) {
-   FieldSet old = this.forwardedFields1.get(sourceField);
-   if (old == null) {
-   old = FieldSet.EMPTY_SET;
+
+   @Override
+   public int getSourceField(int input, int targetField) {
+   Map fieldMapping;
+
+   if (input != 0 && input != 1) {
+   throw new IndexOutOfBoundsException();
+   } else if (input == 0) {
+   fieldMapping = fieldMapping1;
+   } else {
+   fieldMapping = fieldMapping2;
+   }
+
+   for (int sourceField : fieldMapping.keySet()) {
--- End diff --

Good point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

2015-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/311#discussion_r23163889
  
--- Diff: 
flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java
 ---
@@ -86,8 +87,8 @@ public void 
computeInterestingPropertiesForInputs(CostEstimator estimator) {
}
 
@Override
-   public boolean isFieldConstant(int input, int fieldNumber) {
-   return false;
+   public SemanticProperties getSemanticProperties() {
+   return null;
--- End diff --

Yes, will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1376] [runtime] Add proper shared slot ...

2015-01-19 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/318#discussion_r23163714
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class represents a shared slot. A shared slot can have multiple
+ * {@link org.apache.flink.runtime.instance.SimpleSlot} instances within 
itself. This allows to
+ * schedule multiple tasks simultaneously, enabling Flink's streaming 
capabilities.
+ *
+ * IMPORTANT: This class contains no synchronization. Thus, the caller has 
to guarantee proper
+ * synchronization. In the current implementation, all concurrently 
modifying operations are
+ * passed through a {@link SlotSharingGroupAssignment} object which is 
responsible for
+ * synchronization.
+ *
+ */
+public class SharedSlot extends Slot {
+
+   private final SlotSharingGroupAssignment assignmentGroup;
+
+   private final Set subSlots;
+
+   public SharedSlot(JobID jobID, Instance instance, int slotNumber,
+   SlotSharingGroupAssignment 
assignmentGroup, SharedSlot parent,
+   AbstractID groupID) {
+   super(jobID, instance, slotNumber, parent, groupID);
+
+   this.assignmentGroup = assignmentGroup;
+   this.subSlots = new HashSet();
+   }
+
+   public Set getSubSlots() {
+   return subSlots;
+   }
+
+   /**
+* Removes the simple slot from the {@link 
org.apache.flink.runtime.instance.SharedSlot}. Should
+* only be called through the
+* {@link 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment} 
attribute
+* assignmnetGroup.
+*
+* @param slot slot to be removed from the set of sub slots.
+* @return Number of remaining sub slots
+*/
+   public int freeSubSlot(Slot slot){
+   if(!subSlots.remove(slot)){
+   throw new IllegalArgumentException("Wrong shared slot 
for sub slot.");
+   }
+
+   return subSlots.size();
+   }
+
+   @Override
+   public int getNumberLeaves() {
+   int result = 0;
+
+   for(Slot slot: subSlots){
+   result += slot.getNumberLeaves();
+   }
+
+   return result;
+   }
+
+   @Override
+   public void cancel() {
+   // Guarantee that the operation is only executed once
+   if (markCancelled()) {
+   assignmentGroup.releaseSharedSlot(this);
+   }
+   }
+
+   /**
+* Release this shared slot. In order to do this:
+*
+* 1. Cancel and release all sub slots atomically with respect to the 
assigned assignment group.
+* 2. Set the state of the shared slot to be cancelled.
+* 3. Dispose the shared slot (returning the slot to the instance).
+*
+* After cancelAndReleaseSubSlots, the shared slot is marked to be 
dead. This prevents further
+* sub slot creation by the scheduler.
+*/
+   @Override
+   public void releaseSlot() {
+   assignmentGroup.releaseSharedSlot(this);
+   }
+
+   /**
+* Creates a new sub slot if the slot is not dead, yet. This method 
should only be called from
+* the assignment group instance to guarantee synchronization.
+*
+* @param jID id to identify tasks which can be deployed in this sub 
slot
+* @return new sub slot if the shared slot is still alive, otherwise 
null
+*/
+   public Simpl

[jira] [Commented] (FLINK-1372) TaskManager and JobManager do not log startup settings any more

2015-01-19 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282554#comment-14282554
 ] 

Ufuk Celebi commented on FLINK-1372:


[~till.rohrmann] do you have something to add? Could we just use the logger of 
the companion object? Let's fix this today.

> TaskManager and JobManager do not log startup settings any more
> ---
>
> Key: FLINK-1372
> URL: https://issues.apache.org/jira/browse/FLINK-1372
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 0.9
>
>
> In prior versions, the jobmanager and taskmanager logged a lot of startup 
> options:
>  - Environment
>  - ports
>  - memory configuration
>  - network configuration
> Currently, they log very little. We should add the logging back in.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1409) Connected datastream functionality broken since the introduction of intermediate results

2015-01-19 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282536#comment-14282536
 ] 

Ufuk Celebi commented on FLINK-1409:


Fixed in 95fece8. Travis is currently running into some time outs. I ran all 
tests locally.

[Sorry for the delay. Friday and the weekend were pretty packed.]

> Connected datastream functionality broken since the introduction of 
> intermediate results
> 
>
> Key: FLINK-1409
> URL: https://issues.apache.org/jira/browse/FLINK-1409
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Gyula Fora
>
> The connected data stream functionality which allows joint transformations on 
> two data streams of arbitrary type is broken since Ufuk's commit which 
> introduces the intermediate results.
> The problem is most likely in the CoRecordReader which should allow 
> nonblocking read from inputs with different types. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1406] update Flink compatibility notice

2015-01-19 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/314#issuecomment-70492395
  
:+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1406] update Flink compatibility notice

2015-01-19 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/314#issuecomment-70490230
  
@fhueske I updated the documentation to include the link.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1165] No createCollectionsEnvironment i...

2015-01-19 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/320#issuecomment-70487388
  
Thanks :) Personally, I'm fine with both variants, but if the Scala variant 
is already called Collections environment, I would go for the same name and 
leave the Scala API as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1165] No createCollectionsEnvironment i...

2015-01-19 Thread ajaybhat
Github user ajaybhat commented on the pull request:

https://github.com/apache/flink/pull/320#issuecomment-70484376
  
Fixed the typos.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-1003) Spread out scheduling strategy

2015-01-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-1003:


Assignee: (was: Till Rohrmann)

> Spread out scheduling strategy
> --
>
> Key: FLINK-1003
> URL: https://issues.apache.org/jira/browse/FLINK-1003
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>
> Currently the Flink scheduler tries to fill one instance completely before 
> the tasks are deployed to another instance. This is a good behaviour in 
> multi-user and multi-job scenarios but it wastes resources if one wants to 
> use the complete cluster. Therefore, another scheduling strategy where the 
> load among the different instances is kept balanced might be useful. This 
> spread out strategy will deploy the tasks such that the overall work is 
> equally distributed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1003] Added spread out scheduling strat...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/60


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1003] Added spread out scheduling strat...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/60#issuecomment-70477464
  
I agree, too much has changed since the PR was opened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations

2015-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282393#comment-14282393
 ] 

Fabian Hueske commented on FLINK-1419:
--

I think it is fine to make files in the DC immutable (read only). An operator 
that wants to modify files, can create a local writable copy.

Files in the DC should only be copied once per TM and stay until the job is 
finished, IMO.
The question is, who initiates the copy process. The first task that requires 
the file? In that case, all other tasks need to recognize that the file is 
copied by another task and wait until the copying is completed.

> DistributedCache doesn't preserver files for subsequent operations
> --
>
> Key: FLINK-1419
> URL: https://issues.apache.org/jira/browse/FLINK-1419
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.8, 0.9
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> When subsequent operations want to access the same files in the DC it 
> frequently happens that the files are not created for the following operation.
> This is fairly odd, since the DC is supposed to either a) preserve files when 
> another operation kicks in within a certain time window, or b) just recreate 
> the deleted files. Both things don't happen.
> Increasing the time window had no effect.
> I'd like to use this issue as a starting point for a more general discussion 
> about the DistributedCache. 
> Currently:
> 1. all files reside in a common job-specific directory
> 2. are deleted during the job.
>  
> One thing that was brought up about Trait 1 is that it basically forbids 
> modification of the files, concurrent access and all. Personally I'm not sure 
> if this a problem. Changing it to a task-specific place solved the issue 
> though.
> I'm more concerned about Trait #2. Besides the mentioned issue, the deletion 
> is realized with the scheduler, which adds a lot of complexity to the current 
> code. (It really is a pain to work on...) 
> If we moved the deletion to the end of the job it could be done as a clean-up 
> step in the TaskManager, With this we could reduce the DC to a 
> cacheFile(String source) method, the delete method in the TM, and throw out 
> everything else.
> Also, the current implementation implies that big files may be copied 
> multiple times. This may be undesired, depending on how big the files are.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1165] No createCollectionsEnvironment i...

2015-01-19 Thread ajaybhat
GitHub user ajaybhat opened a pull request:

https://github.com/apache/flink/pull/320

[FLINK-1165] No createCollectionsEnvironment in Java API

This commit adds a new method to ExecutionEnvironment to create a
CollectionEnvironment, and applies the method to cases where a
CollectionEnvironment() may be needed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ajaybhat/flink FLINK-1165

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #320


commit baace5236340831789cddffaa75865f3c5dc3bb3
Author: Ajay Bhat 
Date:   2015-01-19T10:45:24Z

[FLINK-1165] No createCollectionsEnvironment in Java API

This commit adds a new method to ExecutionEnvironment to create a
CollectionEnvironment, and applies the method to cases where a
CollectionEnvironment() may be needed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations

2015-01-19 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-1419:
---

Assignee: Chesnay Schepler

> DistributedCache doesn't preserver files for subsequent operations
> --
>
> Key: FLINK-1419
> URL: https://issues.apache.org/jira/browse/FLINK-1419
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.8, 0.9
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> When subsequent operations want to access the same files in the DC it 
> frequently happens that the files are not created for the following operation.
> This is fairly odd, since the DC is supposed to either a) preserve files when 
> another operation kicks in within a certain time window, or b) just recreate 
> the deleted files. Both things don't happen.
> Increasing the time window had no effect.
> I'd like to use this issue as a starting point for a more general discussion 
> about the DistributedCache. 
> Currently:
> 1. all files reside in a common job-specific directory
> 2. are deleted during the job.
>  
> One thing that was brought up about Trait 1 is that it basically forbids 
> modification of the files, concurrent access and all. Personally I'm not sure 
> if this a problem. Changing it to a task-specific place solved the issue 
> though.
> I'm more concerned about Trait #2. Besides the mentioned issue, the deletion 
> is realized with the scheduler, which adds a lot of complexity to the current 
> code. (It really is a pain to work on...) 
> If we moved the deletion to the end of the job it could be done as a clean-up 
> step in the TaskManager, With this we could reduce the DC to a 
> cacheFile(String source) method, the delete method in the TM, and throw out 
> everything else.
> Also, the current implementation implies that big files may be copied 
> multiple times. This may be undesired, depending on how big the files are.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1418) Make 'print()' output on the client command line, rather than on the task manager sysout

2015-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282360#comment-14282360
 ] 

Fabian Hueske commented on FLINK-1418:
--

I agree, printing to the TMs std-out is pretty pointless and doesn't help 
anybody and local testing will not be affected.
However, this change should come with a big warning sign in the documentation 
and JavaDocs because it might cause huge results being transferred to the 
client. 

> Make 'print()' output on the client command line, rather than on the task 
> manager sysout
> 
>
> Key: FLINK-1418
> URL: https://issues.apache.org/jira/browse/FLINK-1418
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>
> Right now, the {{print()}} command prints inside the data sinks where the 
> code runs. It should pull data back to the client and print it there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-655) Add support for both single and set of broadcast values

2015-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282334#comment-14282334
 ] 

Fabian Hueske commented on FLINK-655:
-

If we add support to read a scalar value from the RuntimeContext, we should 
rename the {{getBroadcastVariable}} method and make clear that returns multiple 
values. 
The singular form of {{getBroadcastVariable}} might otherwise suggest that this 
method returns a single value and collide with the semantics of the scalar 
returning method.

> Add support for both single and set of broadcast values
> ---
>
> Key: FLINK-655
> URL: https://issues.apache.org/jira/browse/FLINK-655
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Ufuk Celebi
>Assignee: Henry Saputra
>  Labels: breaking-api, github-import, starter
> Fix For: pre-apache
>
>
> To broadcast a data set you have to do the following:
> ```java
> lorem.map(new MyMapper()).withBroadcastSet(toBroadcast, "toBroadcastName")
> ```
> In the operator you call:
> ```java
> getRuntimeContext().getBroadcastVariable("toBroadcastName")
> ```
> I propose to have both method names consistent, e.g.
>   - `withBroadcastVariable(DataSet, String)`, or
>   - `getBroadcastSet(String)`.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/655
> Created by: [uce|https://github.com/uce]
> Labels: enhancement, java api, user satisfaction, 
> Created at: Wed Apr 02 16:29:08 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1296] Add sorter support for very large...

2015-01-19 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/249#issuecomment-70470401
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-1419) DistributedCache doesn't preserver files for subsequent operations

2015-01-19 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-1419:
---

 Summary: DistributedCache doesn't preserver files for subsequent 
operations
 Key: FLINK-1419
 URL: https://issues.apache.org/jira/browse/FLINK-1419
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.8, 0.9
Reporter: Chesnay Schepler


When subsequent operations want to access the same files in the DC it 
frequently happens that the files are not created for the following operation.

This is fairly odd, since the DC is supposed to either a) preserve files when 
another operation kicks in within a certain time window, or b) just recreate 
the deleted files. Both things don't happen.
Increasing the time window had no effect.

I'd like to use this issue as a starting point for a more general discussion 
about the DistributedCache. 

Currently:
1. all files reside in a common job-specific directory
2. are deleted during the job.
 
One thing that was brought up about Trait 1 is that it basically forbids 
modification of the files, concurrent access and all. Personally I'm not sure 
if this a problem. Changing it to a task-specific place solved the issue though.

I'm more concerned about Trait #2. Besides the mentioned issue, the deletion is 
realized with the scheduler, which adds a lot of complexity to the current 
code. (It really is a pain to work on...) 
If we moved the deletion to the end of the job it could be done as a clean-up 
step in the TaskManager, With this we could reduce the DC to a cacheFile(String 
source) method, the delete method in the TM, and throw out everything else.
Also, the current implementation implies that big files may be copied multiple 
times. This may be undesired, depending on how big the files are.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1400) In local mode, the default TaskManager won't listen on the data port.

2015-01-19 Thread Sergey Dudoladov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282314#comment-14282314
 ] 

Sergey Dudoladov commented on FLINK-1400:
-


 Thanks, Stephan. This solves my problem.

 Probably it is worth documenting this expected behavior ? 
 Say, in the JM Web Interface  display "Not applicable" for the default TM 
instead of the data port number
 so that further confusion can be avoided ?
  

> In local mode, the default TaskManager won't listen on the data port.
> -
>
> Key: FLINK-1400
> URL: https://issues.apache.org/jira/browse/FLINK-1400
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9
> Environment: Ubuntu 14.04 LTS
>Reporter: Sergey Dudoladov
>Priority: Minor
>
>  The Task Manager automatically started by the Job Manager (JobManager.scala, 
> appr. line  470)  in the local mode does not listen on the dataport. 
> To reproduce:
> 1) Start Flink via ./start-local.sh
> 2) Look up the data port number on locahost:8081 -> "Task Managers" tab
> 3) sudo netstat -taupen | grep "dataport_number "
>  
> Or  start the second Task Manager and run  Flink with the degree of 
> parallelism 2 (assuming one slot per Task Manager)
> 4) ./flink run -p 2 ...
> Task Managers started via ./taskmanager.sh work fine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-19 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/304#issuecomment-70467240
  
I guess you are referring to this commit? 
https://github.com/apache/flink/commit/020b282bdc5468aa51b231e9ae8d4d3a1a76e696

I don't see a reason NOT to add commonly used types to Kryo by default. We 
shouldn't claim full Jodatime support anywhere. But having for example 
Jodatime's DateTime activated by default will lead to a much better user 
experience.
If there were more users using other classes from Jodatime, there would be 
serializers for it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1098) flatArray() operator that converts arrays to elements

2015-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282286#comment-14282286
 ] 

Fabian Hueske commented on FLINK-1098:
--

I like the idea 
+1

> flatArray() operator that converts arrays to elements
> -
>
> Key: FLINK-1098
> URL: https://issues.apache.org/jira/browse/FLINK-1098
> Project: Flink
>  Issue Type: New Feature
>Reporter: Timo Walther
>Priority: Minor
>
> It would be great to have an operator that converts e.g. from String[] to 
> String. Actually, it is just a flatMap over the elements of an array.
> A typical use case is a WordCount where we then could write:
> {code}
> text
> .map((line) -> line.toLowerCase().split("\\W+"))
> .flatArray()
> .map((word) -> new Tuple2(word, 1))
> .groupBy(0)
> .sum(1);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)