[jira] [Updated] (FLINK-1994) Add different gain calculation schemes to SGD

2015-07-31 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-1994:
-
Assignee: Trevor Grant

 Add different gain calculation schemes to SGD
 -

 Key: FLINK-1994
 URL: https://issues.apache.org/jira/browse/FLINK-1994
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Trevor Grant
Priority: Minor
  Labels: ML, Starter

 The current SGD implementation uses as gain for the weight updates the 
 formula {{stepsize/sqrt(iterationNumber)}}. It would be good to make the gain 
 calculation configurable and to provide different strategies for that. For 
 example:
 * stepsize/(1 + iterationNumber)
 * stepsize*(1 + regularization * stepsize * iterationNumber)^(-3/4)
 See also how to properly select the gains [1].
 Resources:
 [1] http://arxiv.org/pdf/1107.2490.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1994) Add different gain calculation schemes to SGD

2015-07-31 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649172#comment-14649172
 ] 

Till Rohrmann commented on FLINK-1994:
--

Great to hear [~rawkintrevo]. I assigned the issue to you.

 Add different gain calculation schemes to SGD
 -

 Key: FLINK-1994
 URL: https://issues.apache.org/jira/browse/FLINK-1994
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Trevor Grant
Priority: Minor
  Labels: ML, Starter

 The current SGD implementation uses as gain for the weight updates the 
 formula {{stepsize/sqrt(iterationNumber)}}. It would be good to make the gain 
 calculation configurable and to provide different strategies for that. For 
 example:
 * stepsize/(1 + iterationNumber)
 * stepsize*(1 + regularization * stepsize * iterationNumber)^(-3/4)
 See also how to properly select the gains [1].
 Resources:
 [1] http://arxiv.org/pdf/1107.2490.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/957


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-2248:
--
Assignee: Sachin Goel

 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Assignee: Sachin Goel
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...

2015-07-31 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-126705428
  
Thanks for the pull request!

You did not convert all InputFormats and OutputFormats. There are a few 
more, e.g. HadoopInputFormatBase or JDBCInputFormat. Other than that, your pull 
request looks good. I think it is fine to keep the notion of normal and rich, 
just like we do for the operator user functions. That also means that we do not 
break the existing interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649156#comment-14649156
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126674684
  
Thank you for your patience :) I will merge this once the tests have passed.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126674684
  
Thank you for your patience :) I will merge this once the tests have passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...

2015-07-31 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-126706999
  
Ah yes. I'll update them in a while. There's actually some problem with the 
unit test I've written too. Travis fails sporadically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649271#comment-14649271
 ] 

ASF GitHub Bot commented on FLINK-1819:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/966#issuecomment-126706999
  
Ah yes. I'll update them in a while. There's actually some problem with the 
unit test I've written too. Travis fails sporadically.


 Allow access to RuntimeContext from Input and OutputFormats
 ---

 Key: FLINK-1819
 URL: https://issues.apache.org/jira/browse/FLINK-1819
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.9, 0.8.1
Reporter: Fabian Hueske
Priority: Minor
 Fix For: 0.9


 User function that extend a RichFunction can access a {{RuntimeContext}} 
 which gives the parallel id of the task and access to Accumulators and 
 BroadcastVariables. 
 Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649146#comment-14649146
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126672392
  
Yeah. I think we should remove it then. Too many entries tend to confuse 
people.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126672392
  
Yeah. I think we should remove it then. Too many entries tend to confuse 
people.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...

2015-07-31 Thread sachingoel0101
GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/966

[FLINK-1819][core]Allow access to RuntimeContext from Input and Output 
formats

1. Introduces new Rich Input and Output formats, similar to Rich Functions.
2. Makes all existing input and output formats rich, without any API 
breaking changes.
3. Provides RuntimeContext to GenericSink and GenericSource operators
4. DataSourceTask and DataSinkTask set DistributedUDFContext to their input 
and output formats respectively.
5. Complete access to RuntimeContext, including access to accumulators.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink flink-1819

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #966


commit 627ee290fb88ac3e670fe86a734116b06f431ade
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-07-31T04:36:20Z

[FLINK-1819][core]Allow access to RuntimeContext from Input and Output
formats




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649165#comment-14649165
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126675074
  
Sure. No problem. :)


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649185#comment-14649185
 ] 

ASF GitHub Bot commented on FLINK-1819:
---

GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/966

[FLINK-1819][core]Allow access to RuntimeContext from Input and Output 
formats

1. Introduces new Rich Input and Output formats, similar to Rich Functions.
2. Makes all existing input and output formats rich, without any API 
breaking changes.
3. Provides RuntimeContext to GenericSink and GenericSource operators
4. DataSourceTask and DataSinkTask set DistributedUDFContext to their input 
and output formats respectively.
5. Complete access to RuntimeContext, including access to accumulators.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink flink-1819

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #966


commit 627ee290fb88ac3e670fe86a734116b06f431ade
Author: Sachin Goel sachingoel0...@gmail.com
Date:   2015-07-31T04:36:20Z

[FLINK-1819][core]Allow access to RuntimeContext from Input and Output
formats




 Allow access to RuntimeContext from Input and OutputFormats
 ---

 Key: FLINK-1819
 URL: https://issues.apache.org/jira/browse/FLINK-1819
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.9, 0.8.1
Reporter: Fabian Hueske
Priority: Minor
 Fix For: 0.9


 User function that extend a RichFunction can access a {{RuntimeContext}} 
 which gives the parallel id of the task and access to Accumulators and 
 BroadcastVariables. 
 Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126672811
  
I've updated the code to remove any changes in Configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649147#comment-14649147
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126672811
  
I've updated the code to remove any changes in Configuration.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2409] [webserver] Replaces ActorRefs wi...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/959#issuecomment-126698555
  
Travis failed only in one build where the `YARNSessionFIFOITCase` did not 
start. I suspect that it's due to the instability of the 
`YARNSessionFIFOITCase` test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2409) Old JM web interface is sending cancel messages w/o leader ID

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649234#comment-14649234
 ] 

ASF GitHub Bot commented on FLINK-2409:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/959#issuecomment-126698555
  
Travis failed only in one build where the `YARNSessionFIFOITCase` did not 
start. I suspect that it's due to the instability of the 
`YARNSessionFIFOITCase` test.


 Old JM web interface is sending cancel messages w/o leader ID
 -

 Key: FLINK-2409
 URL: https://issues.apache.org/jira/browse/FLINK-2409
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Till Rohrmann

 {code}
 12:29:41,877 ERROR akka.actor.OneForOneStrategy   
- Received a message CancelJob(4b3631741c344881362ea46e29980ce4) without a 
 leader session ID, even though it requires to have one.
 java.lang.Exception: Received a message 
 CancelJob(4b3631741c344881362ea46e29980ce4) without a leader session ID, even 
 though it requires to have one.
 at 
 org.apache.flink.runtime.LeaderSessionMessages$$anonfun$receive$1.applyOrElse(LeaderSessionMessages.scala:49)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
 org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
 at 
 org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
 at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at 
 org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at 
 org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:101)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
 at 
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 12:29:41,879 INFO  org.apache.flink.runtime.jobmanager.JobManager 
- Stopping JobManager akka://flink/user/jobmanager#-638215033.
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126675074
  
Sure. No problem. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126695988
  
Travis build passes. You can merge it @mxm 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649214#comment-14649214
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126695988
  
Travis build passes. You can merge it @mxm 


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649269#comment-14649269
 ] 

ASF GitHub Bot commented on FLINK-1819:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/966#discussion_r35977130
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 ---
@@ -75,6 +78,13 @@
// cancel flag
private volatile boolean taskCanceled = false;
 
+   /**
+* The accumulator map used in the RuntimeContext.
+*/
+   protected MapString, Accumulator?,? accumulatorMap;
--- End diff --

This field should be final. Or you get rid of the field and just access the 
accumulators via `getEnvironment().getAccumulatorRegistry().getUserMap()`.


 Allow access to RuntimeContext from Input and OutputFormats
 ---

 Key: FLINK-1819
 URL: https://issues.apache.org/jira/browse/FLINK-1819
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.9, 0.8.1
Reporter: Fabian Hueske
Priority: Minor
 Fix For: 0.9


 User function that extend a RichFunction can access a {{RuntimeContext}} 
 which gives the parallel id of the task and access to Accumulators and 
 BroadcastVariables. 
 Right now, Input and OutputFormats cannot access their {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984232
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample without 
replacement, each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   the random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction = 0  fraction = 1.0d, 
fraction fraction must between [0, 1].);
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input elements to be sampled.
+* @return the sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() = 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   }
+   return false;
--- End diff --

I think, if I'm not mistaken, that `hasNext` has to be idempotent. Thus it 
should return `true` if `current != null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984950
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import java.util.Iterator;
+
+public abstract class RandomSamplerT {
+   
+   protected final IteratorT EMPTY_ITERABLE = new SampledIteratorT() {
+   @Override
+   public boolean hasNext() {
+   return false;
+   }
+   
+   @Override
+   public T next() {
+   return null;
+   }
+   };
+   
+   /**
+* Randomly sample the elements from input, and return the result 
iterator.
+*
+* @param input source data
+* @return the sample result.
+*/
+   public abstract IteratorT sample(IteratorT input);
+}
+
+abstract class SampledIteratorT implements IteratorT {
--- End diff --

JavaDocs are missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests

2015-07-31 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649394#comment-14649394
 ] 

Sachin Goel commented on FLINK-2449:


Or rather override it in your test class to return true.

 DistCache doesn't work with JavaProgram Collection tests
 

 Key: FLINK-2449
 URL: https://issues.apache.org/jira/browse/FLINK-2449
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When attempting to retrieve a file during the Collection test in 
 JavaProgramTestBase you'll encounter an exception claiming the file was never 
 registered.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2451) Cleanup Gelly examples

2015-07-31 Thread Vasia Kalavri (JIRA)
Vasia Kalavri created FLINK-2451:


 Summary: Cleanup Gelly examples
 Key: FLINK-2451
 URL: https://issues.apache.org/jira/browse/FLINK-2451
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.10
Reporter: Vasia Kalavri
Assignee: Vasia Kalavri
Priority: Minor


As per discussion in the dev@ mailing list, this issue proposes the following 
changes to the Gelly examples and library:

1. Keep the following examples as they are:
EuclideanGraphWeighing, GraphMetrics, IncrementalSSSP, JaccardSimilarity,  
MusicProfiles.
2. Keep only 1 example to show how to use library methods.
3. Add 1 example for vertex-centric iterations.
4. Keep 1 example for GSA iterations and move the redundant GSA implementations 
to the library.
5. Improve the examples documentation and refer to the functionality that each 
of them demonstrates.
6. Port and modify existing example tests accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991461
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -893,13 +1086,11 @@ public Object call() throws Exception {
}
}, executionContext);
break;
-   }
-   else if (numberOfRetriesLeft = 
0  transitionState(current, JobStatus.FAILED, failureCause)) {
+   } else if (numberOfRetriesLeft 
= 0  transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup();
break;
}
-   }
-   else {
+   } else {
--- End diff --

You're touching a lot of code here (and above) for no other reason than 
reformatting. We have no style guide yet that enforces this formatting. In 
general, we try not to touch existing lines to keep the latest history accurate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991960
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -560,53 +621,177 @@ public ExecutionContext getExecutionContext() {
}
 
/**
+* This works as cache for already merged accumulators, as, in some 
cases,
+* we do not want to remerge accumulators as this may lead to duplicate 
entries.
+*/
+   private MapString, Accumulator?, ? mergedSmallUserAccumulators;
--- End diff --

Why can it lead to duplicate entries? The accumulators should always be 
merged from scratch. Every accumulator update message contains the current 
state of the accumulators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984921
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import java.util.Iterator;
+
+public abstract class RandomSamplerT {
--- End diff --

JavaDocs are missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649351#comment-14649351
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984921
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import java.util.Iterator;
+
+public abstract class RandomSamplerT {
--- End diff --

JavaDocs are missing


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-126740651
  
Thanks for your contribution @ChengXiangLi. The code is really well tested 
and well structured. Great work :-)

I had only some minor comments. There is however one thing I'm not so sure 
about. With the current implementation, all parallel tasks of the sampling 
operator will get the same random generator/seed value. Thus, every node will 
generate the same sequence of random numbers. I think this can have a negative 
influence on the sampling. What we could do is to use 
`RichMapPartitionFunction` instead of the `MapPartitionFunction`. With the rich 
function, we either have access to the subtask index, given by 
`getRuntimeContext().getIndexOfThisSubtask()`,  which we could use to modify 
the initial seed or we generate the random number generator in the `open` 
method (this method is executed on the TaskManager). Assuming that the clocks 
are not completely synchronized and that the individual tasks will be 
instantiated not at the same time, this could give us less correlated random 
number sequences. What do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649332#comment-14649332
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984232
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample without 
replacement, each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   the random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction = 0  fraction = 1.0d, 
fraction fraction must between [0, 1].);
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input elements to be sampled.
+* @return the sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() = 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   }
+   return false;
--- End diff --

I think, if I'm not mistaken, that `hasNext` has to be idempotent. Thus it 
should return `true` if `current != null`.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: 

[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-07-31 Thread nltran
GitHub user nltran opened a pull request:

https://github.com/apache/flink/pull/967

Stale Synchronous Parallel Iterations

Here is a pull request containing our development on [Stale Synchronous 
Parallel 
(SSP)](http://reports-archive.adm.cs.cmu.edu/anon/ml2013/CMU-ML-13-103.pdf) 
iterations. The pull request contains:

* Code supporting SSP
* API to configure and enable SSP from the ExecutionEnvironment

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nltran/flink SSP

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/967.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #967


commit 0435bb9114f48aeefb330e7cf92610be84f79c81
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:21:08Z

* Added the model for parameter server and parameter element
* Added parameter server implementation based on Apache Ignite
* Started instance of parameter server in TaskManager
* Added Apache Ignite as dependency in pom.xml

commit decc66db856d3d640e7e29128e2696c0941091dd
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:24:32Z

* Extended RichMapFunction with methods to access parameter server

commit 55398da553ce697af9bea881ea7131818edb82d2
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:25:08Z

* Extended DataSet API to enable SSP and configuration

commit 6ef80ceac5e6ef897769e1883823d152d0c04070
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:25:55Z

* Extended ExecutionEnvironment and ExecutionConfig to enable SSP and SPP 
configuration

commit 368ca1c101034aefa8b6ac0ce4791133976831e0
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:29:29Z

* Added drop-in control structures for Stale Synchronous Parallel iterations

commit 23fb6518fe7042ad859bb9474266131f2bdb669c
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:30:31Z

* Added the events used by the control structures for Stale Synchronous 
Parallel iterations

commit 48810bde7783610112751f2221126b69a1ac9b56
Author: Nam-Luc Tran namluc.t...@euranova.eu
Date:   2015-07-31T15:32:02Z

* Extended the job translation to take into account the control structures
  related to Stale Synchronous Parallel Iterations




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35988629
  
--- Diff: pom.xml ---
@@ -224,6 +224,12 @@ under the License.
version3.2.1/version
/dependency
 
+   dependency
+   groupIdorg.apache.commons/groupId
+   artifactIdcommons-math3/artifactId
+   version3.5/version
+   /dependency
--- End diff --

For that we have to add an entry in the `flink-dist/NOTICE` and 
`flink-dist/LICENSE` files. But I can do that when merging the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649419#comment-14649419
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35988629
  
--- Diff: pom.xml ---
@@ -224,6 +224,12 @@ under the License.
version3.2.1/version
/dependency
 
+   dependency
+   groupIdorg.apache.commons/groupId
+   artifactIdcommons-math3/artifactId
+   version3.5/version
+   /dependency
--- End diff --

For that we have to add an entry in the `flink-dist/NOTICE` and 
`flink-dist/LICENSE` files. But I can do that when merging the PR.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649385#comment-14649385
 ] 

ASF GitHub Bot commented on FLINK-2446:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/965


 SocketTextStreamFunction has memory leak when reconnect server
 --

 Key: FLINK-2446
 URL: https://issues.apache.org/jira/browse/FLINK-2446
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649341#comment-14649341
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984449
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sample 
elements with replacement,
+ * the picked number of each element follow poisson distribution, so we 
could use poisson distribution
--- End diff --

Typo: element follows a given poisson distribution


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649337#comment-14649337
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984385
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sample 
elements with replacement,
--- End diff --

While sampling elements


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985033
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the algorithm described in 
Reservoir-based Random Sampling with Replacement
+ * from Data Stream.
--- End diff --

Maybe put a link to the paper in here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985704
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithoutReplacement.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling without 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the Algorithm R described in Random 
Sampling with a Reservoir Vitter, 1985.
--- End diff --

Maybe add again a link to the paper if available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649372#comment-14649372
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985704
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithoutReplacement.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling without 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the Algorithm R described in Random 
Sampling with a Reservoir Vitter, 1985.
--- End diff --

Maybe add again a link to the paper if available.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server

2015-07-31 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-2446.
-
   Resolution: Fixed
Fix Version/s: 0.10

Fixed via b0f2379.

 SocketTextStreamFunction has memory leak when reconnect server
 --

 Key: FLINK-2446
 URL: https://issues.apache.org/jira/browse/FLINK-2446
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: fangfengbin
Priority: Minor
 Fix For: 0.10






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2436] [streaming] Make ByteStreamStateH...

2015-07-31 Thread gyfora
Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/958#issuecomment-126733653
  
If you could take a quick look @uce,  I would like to merge this :)

I think it is a safe change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2436) Make ByteStreamStateHandles more robust

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649398#comment-14649398
 ] 

ASF GitHub Bot commented on FLINK-2436:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/958#issuecomment-126733653
  
If you could take a quick look @uce,  I would like to merge this :)

I think it is a safe change.


 Make ByteStreamStateHandles more robust
 ---

 Key: FLINK-2436
 URL: https://issues.apache.org/jira/browse/FLINK-2436
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Critical

 The ByteStreamStateHandles are currently implemented in a very specific way 
 that makes assumptions of their actual usage. Also there is a possible 
 resource leak when errors occur during writing or reading from the stream.
 This affects FileStateHandles as well of course.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2452) Add a playcount threshold to the MusicProfiles example

2015-07-31 Thread Vasia Kalavri (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vasia Kalavri updated FLINK-2452:
-
Summary: Add a playcount threshold to the MusicProfiles example  (was: Add 
a playcount threshold to the MusicProfiles examples)

 Add a playcount threshold to the MusicProfiles example
 --

 Key: FLINK-2452
 URL: https://issues.apache.org/jira/browse/FLINK-2452
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.10
Reporter: Vasia Kalavri
Assignee: Vasia Kalavri
Priority: Minor

 In the MusicProfiles example, when creating the user-user similarity graph, 
 an edge is created between any 2 users that have listened to the same song 
 (even if once). Depending on the input data, this might produce a projection 
 graph with many more edges than the original user-song graph.
 To make this computation more efficient, this issue proposes adding a 
 user-defined parameter that filters out songs that a user has listened to 
 only a few times. Essentially, it is a threshold for playcount, above which a 
 user is considered to like a song.
 For reference, with a threshold value of 30, the whole Last.fm dataset is 
 analyzed on my laptop in a few minutes, while no threshold results in a 
 runtime of several hours.
 There are many solutions to this problem, but since this is just an example 
 (not a library method), I think that keeping it simple is important.
 Thanks to [~andralungu] for spotting the inefficiency!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests

2015-07-31 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649444#comment-14649444
 ] 

Chesnay Schepler commented on FLINK-2449:
-

fair enough, scratch the part about skipping the test.

 DistCache doesn't work with JavaProgram Collection tests
 

 Key: FLINK-2449
 URL: https://issues.apache.org/jira/browse/FLINK-2449
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When attempting to retrieve a file during the Collection test in 
 JavaProgramTestBase you'll encounter an exception claiming the file was never 
 registered.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991476
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -979,8 +1174,7 @@ public void 
scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
if (execution == null) {
fail(new IllegalStateException(Cannot find execution 
for execution ID  +
partitionId.getPartitionId()));
-   }
-   else if (execution.getVertex() == null){
+   } else if (execution.getVertex() == null) {
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991467
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -918,8 +1109,7 @@ private void postRunCleanup() {
if (coord != null) {
coord.shutdown();
}
-   }
-   catch (Exception e) {
+   } catch (Exception e) {
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -966,8 +1162,7 @@ public boolean updateState(TaskExecutionState state) {
attempt.fail(new Exception(TaskManager 
sent illegal state update:  + state.getExecutionState()));
return false;
}
-   }
-   else {
+   } else {
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649358#comment-14649358
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985033
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the algorithm described in 
Reservoir-based Random Sampling with Replacement
+ * from Data Stream.
--- End diff --

Maybe put a link to the paper in here.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985116
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the algorithm described in 
Reservoir-based Random Sampling with Replacement
+ * from Data Stream.
+ *
+ * @param T the type of sample.
+ */
+public class ReservoirSamplerWithReplacementT extends RandomSamplerT {
+   private final int numSamples;
+   private final Random random;
+   private PoissonDistribution poissonDistribution;
+   private ListInteger positions;
+   
+   /**
+* Create a reservoir sampler with fixed sample size and default random 
number generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+*/
+   public ReservoirSamplerWithReplacement(int numSamples) {
+   this(numSamples, new Random());
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator seed.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+* @param seed   random number generator seed
+*/
+   public ReservoirSamplerWithReplacement(int numSamples, long seed) {
+   this(numSamples, new Random(seed));
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
--- End diff --

@param is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649360#comment-14649360
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985116
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the algorithm described in 
Reservoir-based Random Sampling with Replacement
+ * from Data Stream.
+ *
+ * @param T the type of sample.
+ */
+public class ReservoirSamplerWithReplacementT extends RandomSamplerT {
+   private final int numSamples;
+   private final Random random;
+   private PoissonDistribution poissonDistribution;
+   private ListInteger positions;
+   
+   /**
+* Create a reservoir sampler with fixed sample size and default random 
number generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+*/
+   public ReservoirSamplerWithReplacement(int numSamples) {
+   this(numSamples, new Random());
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator seed.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+* @param seed   random number generator seed
+*/
+   public ReservoirSamplerWithReplacement(int numSamples, long seed) {
+   this(numSamples, new Random(seed));
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
--- End diff --

@param is missing


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests

2015-07-31 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649392#comment-14649392
 ] 

Sachin Goel commented on FLINK-2449:


You can call {{skipCollectionExecution}} :)

 DistCache doesn't work with JavaProgram Collection tests
 

 Key: FLINK-2449
 URL: https://issues.apache.org/jira/browse/FLINK-2449
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When attempting to retrieve a file during the Collection test in 
 JavaProgramTestBase you'll encounter an exception claiming the file was never 
 registered.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649429#comment-14649429
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-126740651
  
Thanks for your contribution @ChengXiangLi. The code is really well tested 
and well structured. Great work :-)

I had only some minor comments. There is however one thing I'm not so sure 
about. With the current implementation, all parallel tasks of the sampling 
operator will get the same random generator/seed value. Thus, every node will 
generate the same sequence of random numbers. I think this can have a negative 
influence on the sampling. What we could do is to use 
`RichMapPartitionFunction` instead of the `MapPartitionFunction`. With the rich 
function, we either have access to the subtask index, given by 
`getRuntimeContext().getIndexOfThisSubtask()`,  which we could use to modify 
the initial seed or we generate the random number generator in the `open` 
method (this method is executed on the TaskManager). Assuming that the clocks 
are not completely synchronized and that the individual tasks will be 
instantiated not at the same time, this could give us less correlated random 
number sequences. What do you think? 


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991266
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -75,24 +78,24 @@
  * The execution graph is the central data structure that coordinates the 
distributed
  * execution of a data flow. It keeps representations of each parallel 
task, each
  * intermediate result, and the communication between them.
- *
+ * p/
  * The execution graph consists of the following constructs:
  * ul
- * liThe {@link ExecutionJobVertex} represents one vertex from the 
JobGraph (usually one operation like
- * map or join) during execution. It holds the aggregated 
state of all parallel subtasks.
- * The ExecutionJobVertex is identified inside the graph by the 
{@link JobVertexID}, which it takes
- * from the JobGraph's corresponding JobVertex./li
- * liThe {@link ExecutionVertex} represents one parallel subtask. 
For each ExecutionJobVertex, there are
- * as many ExecutionVertices as the parallelism. The 
ExecutionVertex is identified by
- * the ExecutionJobVertex and the number of the parallel 
subtask/li
- * liThe {@link Execution} is one attempt to execute a 
ExecutionVertex. There may be multiple Executions
- * for the ExecutionVertex, in case of a failure, or in the case 
where some data needs to be recomputed
- * because it is no longer available when requested by later 
operations. An Execution is always
- * identified by an {@link ExecutionAttemptID}. All messages 
between the JobManager and the TaskManager
- * about deployment of tasks and updates in the task status always 
use the ExecutionAttemptID to
- * address the message receiver./li
+ * liThe {@link ExecutionJobVertex} represents one vertex from the 
JobGraph (usually one operation like
+ * map or join) during execution. It holds the aggregated state of all 
parallel subtasks.
+ * The ExecutionJobVertex is identified inside the graph by the {@link 
JobVertexID}, which it takes
+ * from the JobGraph's corresponding JobVertex./li
+ * liThe {@link ExecutionVertex} represents one parallel subtask. For 
each ExecutionJobVertex, there are
+ * as many ExecutionVertices as the parallelism. The ExecutionVertex is 
identified by
+ * the ExecutionJobVertex and the number of the parallel subtask/li
+ * liThe {@link Execution} is one attempt to execute a ExecutionVertex. 
There may be multiple Executions
+ * for the ExecutionVertex, in case of a failure, or in the case where 
some data needs to be recomputed
+ * because it is no longer available when requested by later operations. 
An Execution is always
+ * identified by an {@link ExecutionAttemptID}. All messages between the 
JobManager and the TaskManager
+ * about deployment of tasks and updates in the task status always use the 
ExecutionAttemptID to
+ * address the message receiver./li
--- End diff --

This change seems unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -560,53 +621,177 @@ public ExecutionContext getExecutionContext() {
}
 
/**
+* This works as cache for already merged accumulators, as, in some 
cases,
+* we do not want to remerge accumulators as this may lead to duplicate 
entries.
+*/
+   private MapString, Accumulator?, ? mergedSmallUserAccumulators;
+
+   /**
 * Merges all accumulator results from the tasks previously executed in 
the Executions.
+*
 * @return The accumulator map
 */
-   public MapString, Accumulator?,? aggregateUserAccumulators() {
+   public MapString, Accumulator?, ? aggregateSmallUserAccumulators() {
+   return aggregateSmallUserAccumulators(true);
+   }
 
-   MapString, Accumulator?, ? userAccumulators = new 
HashMapString, Accumulator?, ?();
+   /**
+* Merges all accumulator results from the tasks previously executed in 
the Executions.
+* If codereaggregate/code is set to false, then no aggregation is 
performed, and
+* the cache merge result is returned. Otherwise accumulators are 
merged.
+*
+* @param reaggregate codetrue/code if we want to aggregate 
accumulators,
+*codefalse/code otherwise.
+* @return The accumulator map
+*/
+   public MapString, Accumulator?, ? 
aggregateSmallUserAccumulators(boolean reaggregate) {
+   if (!reaggregate) {
+   return mergedSmallUserAccumulators;
+   }
+   this.mergedSmallUserAccumulators = new HashMapString, 
Accumulator?, ?();
 
for (ExecutionVertex vertex : getAllExecutionVertices()) {
-   MapString, Accumulator?, ? next = 
vertex.getCurrentExecutionAttempt().getUserAccumulators();
+   MapString, Accumulator?, ? next = 
vertex.getCurrentExecutionAttempt().getSmallUserAccumulators();
if (next != null) {
-   AccumulatorHelper.mergeInto(userAccumulators, 
next);
+   
AccumulatorHelper.mergeInto(mergedSmallUserAccumulators, next);
}
}
+   return mergedSmallUserAccumulators;
+   }
 
-   return userAccumulators;
+   /**
+* Merges all blobKeys referring to blobs of large accumulators. These 
refer to blobs in the
+* blobCache holding accumulators (results of tasks) that did not fit 
in an akka frame,
+* thus had to be sent through the BlobCache.
+*
+* @return The accumulator map
+*/
+   public MapString, ListBlobKey 
aggregateLargeUserAccumulatorBlobKeys() {
+   MapString, ListBlobKey largeUserAccumulatorRefs = new 
HashMapString, ListBlobKey();
+
+   for (ExecutionVertex vertex : getAllExecutionVertices()) {
+   MapString, ListBlobKey next = 
vertex.getCurrentExecutionAttempt().getLargeUserAccumulatorBlobKeys();
+   
mergeLargeUserAccumulatorBlobKeys(largeUserAccumulatorRefs, next);
+   }
+   return largeUserAccumulatorRefs;
}
 
/**
-* Gets a serialized accumulator map.
+* Adds new blobKeys referring to blobs of large accumulators to the 
already existing ones.
+* These refer to blobs in the blobCache holding accumulators (results 
of tasks) that did not
+* fit in an akka frame, thus had to be sent through the BlobCache.
+*
+* @param target  the initial blobKey map
+* @param toMerge the new keys to add to the initial map
+* @return The resulting accumulator map
+*/
+   public MapString, ListBlobKey addLargeUserAccumulatorBlobKeys(
+   MapString, ListBlobKey target, MapString, 
ListBlobKey toMerge) {
+   if (target == null) {
+   target = new HashMapString, ListBlobKey();
+   }
+   mergeLargeUserAccumulatorBlobKeys(target, toMerge);
+   return target;
+   }
+
+   private void mergeLargeUserAccumulatorBlobKeys(
+   MapString, ListBlobKey target, MapString, 
ListBlobKey toMerge) {
+   if (toMerge == null || toMerge.isEmpty()) {
+   return;
+   }
+
+   for (Map.EntryString, ListBlobKey otherEntry : 
toMerge.entrySet()) {
+   ListBlobKey existing = 
target.get(otherEntry.getKey());
+   if (existing == null) {
+   target.put(otherEntry.getKey(), 
otherEntry.getValue());
+   } else 

[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/934#discussion_r35991400
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -619,6 +718,21 @@ public ExecutionContext getExecutionContext() {
resultStrings[i++] = result;
}
 
+   for (Map.EntryString, ListBlobKey entry : 
largeAccumulatorMap.entrySet()) {
+
+   if(!smallAccumulatorMap.containsKey(entry.getKey())) {
+   StringBuilder str = new StringBuilder();
+   str.append(BlobKeys=[ );
+   for (BlobKey bk : entry.getValue()) {
+   str.append(bk +  );
+   }
+   str.append(]);
+
+   StringifiedAccumulatorResult result =
+   new 
StringifiedAccumulatorResult(entry.getKey(), Blob/Serialized, str.toString());
+   resultStrings[i++] = result;
+   }
+   }
--- End diff --

Ah ok. You're running into the same problem with the framesize here again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests

2015-07-31 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649315#comment-14649315
 ] 

Sachin Goel commented on FLINK-2449:


Collection Executor runs locally without any JobManager or TaskManager. There 
would be no need of a DistributedCache for this. You could just access any file 
locally.

 DistCache doesn't work with JavaProgram Collection tests
 

 Key: FLINK-2449
 URL: https://issues.apache.org/jira/browse/FLINK-2449
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When attempting to retrieve a file during the Collection test in 
 JavaProgramTestBase you'll encounter an exception claiming the file was never 
 registered.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests

2015-07-31 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649323#comment-14649323
 ] 

Chesnay Schepler commented on FLINK-2449:
-

I know i can. But this means that you can not write not a single test extending 
the JavaProgramTestBase that uses the DistCache without hacking around to skip 
the CollectionTests, like i currently have to with the Python API.

The exception should be thrown way earlier, when registering the file opposed 
to retrieving them.
I'd also argue that there should be an easy way to skip the Collection Test if 
it doesn't support all features.

 DistCache doesn't work with JavaProgram Collection tests
 

 Key: FLINK-2449
 URL: https://issues.apache.org/jira/browse/FLINK-2449
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor

 When attempting to retrieve a file during the Collection test in 
 JavaProgramTestBase you'll encounter an exception claiming the file was never 
 registered.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984385
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sample 
elements with replacement,
--- End diff --

While sampling elements


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984449
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sample 
elements with replacement,
+ * the picked number of each element follow poisson distribution, so we 
could use poisson distribution
--- End diff --

Typo: element follows a given poisson distribution


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...

2015-07-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984320
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample without 
replacement, each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   the random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction = 0  fraction = 1.0d, 
fraction fraction must between [0, 1].);
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input elements to be sampled.
+* @return the sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() = 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   }
+   return false;
--- End diff --

Could we put the last return statement in an `else` branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2446]Fix SocketTextStreamFunction has m...

2015-07-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/965


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649215#comment-14649215
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/957


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-2248.
-
   Resolution: Fixed
Fix Version/s: 0.9.1
   0.10

 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Assignee: Sachin Goel
Priority: Minor
  Labels: starter
 Fix For: 0.10, 0.9.1


 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Framesize fix

2015-07-31 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-126752412
  
Hi @kl0u ,

Thanks for addressing my comments. I've made some new ones :) but looks 
much better.

As for merging the different types of snapshots, the only solution I see 
now, is to let all the accumulator results go through the BlobCache. That might 
cause some extra latency at the end of the job. I'm not sure whether we should 
proceed on that path but the current solution has a lot of added overhead 
because we need to deal with two different kind of update mechanisms.

I would like to hear some other opinions on whether we should have both 
update mechanisms (BlobCache and Akka message-based) or just let everything go 
through the BlobCache. I'm leaning more towards the latter if it is not too 
slow for slow accumulators.

I think you have to rebase to the latest master again because of some minor 
changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2454) Update Travis file to run build using Java7

2015-07-31 Thread Henry Saputra (JIRA)
Henry Saputra created FLINK-2454:


 Summary: Update Travis file to run build using Java7
 Key: FLINK-2454
 URL: https://issues.apache.org/jira/browse/FLINK-2454
 Project: Flink
  Issue Type: Sub-task
Reporter: Henry Saputra


Update Travis file to run build using Java7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2455) Misleading I/O manager error log messages

2015-07-31 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2455:
--

 Summary: Misleading I/O manager error log messages
 Key: FLINK-2455
 URL: https://issues.apache.org/jira/browse/FLINK-2455
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
 Fix For: 0.10, 0.9.1


The logs reported by [~andralungu] in FLINK-2412 show a lot of the following 
messages:

{code}
20:13:27,504 WARN  org.apache.flink.runtime.taskmanager.Task
 - Task 'CHAIN DataSource (at getEdgesDataSet(Degrees.java:64) 
(org.apache.flink.api.java.io.CsvInputFormat)) - Map (Map at 
getEdgesDataSet(Degrees.java:64)) (50/60)' did not react to cancelling signal, 
but is stuck in method:
 java.lang.Object.wait(Native Method)
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126)
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:158)
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.release(SpillableSubpartition.java:130)
org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:300)
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:356)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:674)
java.lang.Thread.run(Thread.java:722)

20:13:27,583 ERROR 
org.apache.flink.runtime.io.network.partition.ResultPartition  - Error during 
release of result subpartition: Closing of asynchronous file channel was 
interrupted.
java.io.IOException: Closing of asynchronous file channel was interrupted.
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:130)
at 
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:158)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.release(SpillableSubpartition.java:130)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:300)
at 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:356)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:674)
at java.lang.Thread.run(Thread.java:722)
{code}

This is repeated for each subpartition during the release of a spillable 
partition (each subpartition is closed idp). The task is interrupted while 
waiting for the file channel to be closed.

{code}
20:15:50,329 ERROR 
org.apache.flink.runtime.io.network.partition.ResultPartition  - Error during 
release of result subpartition: IO-Manager has been closed.
java.io.IOException: IO-Manager has been closed.
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.shutdown(IOManagerAsync.java:424)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:125)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2015-07-31 Thread Sheetal Parade (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649744#comment-14649744
 ] 

Sheetal Parade commented on FLINK-2314:
---

If no one is working on this issue, I would like to work.

 Make Streaming File Sources Persistent
 --

 Key: FLINK-2314
 URL: https://issues.apache.org/jira/browse/FLINK-2314
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen
  Labels: easyfix, starter

 Streaming File sources should participate in the checkpointing. They should 
 track the bytes they read from the file and checkpoint it.
 One can look at the sequence generating source function for an example of a 
 checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2447:
--

 Summary: TypeExtractor returns wrong type info when a Tuple has 
two fields of the same POJO type
 Key: FLINK-2447
 URL: https://issues.apache.org/jira/browse/FLINK-2447
 Project: Flink
  Issue Type: Bug
Reporter: Gabor Gevay


Consider the following code:

DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
@Override
public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
value) throws Exception {
return null;
}
});

where FooBarPojo is the following type:
public class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}

This should print a tuple type with two identical fields:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]

But it prints the following instead:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
GenericTypeFooBarPojo

Note, that this problem causes some co-groups in Gelly to crash with 
org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys 
are not compatible with each other when the vertex ID type is a POJO, because 
the second field of the Edge type gets to be a generic type, but the POJO gets 
recognized in the Vertex type, and getNumberOfKeyFields returns different 
numbers for the POJO and the generic type.

The source of the problem is the mechanism in TypeExtractor that would detect 
recursive types (see the alreadySeen field in TypeExtractor), as it mistakes 
the second appearance of FooBarPojo with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy
starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it 
calls itself for the first field, which proceeds into the privateGetForClass 
case which correctly detects that it is a POJO, and correctly returns a 
PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
approaches the second field, goes into privateGetForClass, which mistakenly 
returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is 
being processed.

(Note, that if we comment out the recursive type detection (the lines that do 
their thing with the alreadySeen field), then the output is correct.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2447:
---
Description: 
Consider the following code:

DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
@Override
public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
value) throws Exception {
return null;
}
});

where FooBarPojo is the following type:
public class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}

This should print a tuple type with two identical fields:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]

But it prints the following instead:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
GenericTypeFooBarPojo

Note, that this problem causes some co-groups in Gelly to crash with 
org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys 
are not compatible with each other when the vertex ID type is a POJO, because 
the second field of the Edge type gets to be a generic type, but the POJO gets 
recognized in the Vertex type, and getNumberOfKeyFields returns different 
numbers for the POJO and the generic type.

The source of the problem is the mechanism in TypeExtractor that would detect 
recursive types (see the alreadySeen field in TypeExtractor), as it mistakes 
the second appearance of FooBarPojo with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy starts to 
process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls 
itself for the first field, which proceeds into the privateGetForClass case 
which correctly detects that it is a POJO, and correctly returns a 
PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
approaches the second field, goes into privateGetForClass, which mistakenly 
returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is 
being processed.

(Note, that if we comment out the recursive type detection (the lines that do 
their thing with the alreadySeen field), then the output is correct.)

  was:
Consider the following code:

DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
@Override
public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
value) throws Exception {
return null;
}
});

where FooBarPojo is the following type:
public class FooBarPojo {
public int foo, bar;
public FooBarPojo() {}
}

This should print a tuple type with two identical fields:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]

But it prints the following instead:
Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
GenericTypeFooBarPojo

Note, that this problem causes some co-groups in Gelly to crash with 
org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys 
are not compatible with each other when the vertex ID type is a POJO, because 
the second field of the Edge type gets to be a generic type, but the POJO gets 
recognized in the Vertex type, and getNumberOfKeyFields returns different 
numbers for the POJO and the generic type.

The source of the problem is the mechanism in TypeExtractor that would detect 
recursive types (see the alreadySeen field in TypeExtractor), as it mistakes 
the second appearance of FooBarPojo with a recursive field.

Specifically the following happens: createTypeInfoWithTypeHierarchy
starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it 
calls itself for the first field, which proceeds into the privateGetForClass 
case which correctly detects that it is a POJO, and correctly returns a 
PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
approaches the second field, goes into privateGetForClass, which mistakenly 
returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is 
being processed.

(Note, that if we comment out the recursive type detection (the lines that do 
their thing with the alreadySeen field), then the output is correct.)


 TypeExtractor returns wrong type info when a Tuple has two fields of the same 
 POJO type
 

[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-2447:
---
Component/s: Java API

 TypeExtractor returns wrong type info when a Tuple has two fields of the same 
 POJO type
 ---

 Key: FLINK-2447
 URL: https://issues.apache.org/jira/browse/FLINK-2447
 Project: Flink
  Issue Type: Bug
  Components: Java API
Reporter: Gabor Gevay

 Consider the following code:
 DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
   DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
 MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
   @Override
   public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
 value) throws Exception {
   return null;
   }
   });
 where FooBarPojo is the following type:
 public class FooBarPojo {
   public int foo, bar;
   public FooBarPojo() {}
 }
 This should print a tuple type with two identical fields:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]
 But it prints the following instead:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 GenericTypeFooBarPojo
 Note, that this problem causes some co-groups in Gelly to crash with 
 org.apache.flink.api.common.InvalidProgramException: The pair of co-group 
 keys are not compatible with each other when the vertex ID type is a POJO, 
 because the second field of the Edge type gets to be a generic type, but the 
 POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns 
 different numbers for the POJO and the generic type.
 The source of the problem is the mechanism in TypeExtractor that would detect 
 recursive types (see the alreadySeen field in TypeExtractor), as it 
 mistakes the second appearance of FooBarPojo with a recursive field.
 Specifically the following happens: createTypeInfoWithTypeHierarchy
 starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it 
 calls itself for the first field, which proceeds into the privateGetForClass 
 case which correctly detects that it is a POJO, and correctly returns a 
 PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
 PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
 approaches the second field, goes into privateGetForClass, which mistakenly 
 returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type 
 is being processed.
 (Note, that if we comment out the recursive type detection (the lines that do 
 their thing with the alreadySeen field), then the output is correct.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/957#discussion_r35966826
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java 
---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+   private static LocalFlinkMiniCluster cluster;
+   private static Configuration config;
+   private static String hostPort;
+   private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+   private CliFrontend cli;
+   private OutputStream output;
+
+   @Before
+   public void setUp() throws Exception {
+   stream.reset();
+   output = System.out;
+   System.setOut(new PrintStream(stream));
+
+   config = new Configuration();
+   config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
localhost);
+   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+   
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
1);
+   
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false);
+   hostPort = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : +
+   
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+   try {
+   cluster = new LocalFlinkMiniCluster(config, false, 
StreamingMode.BATCH_ONLY);
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(Setup of test actor system failed.);
+   }
+
+   cli = new CliFrontend(getConfigDir());
+   }
+
+   @After
+   public void shutDownActorSystem() {
+   System.setOut(new PrintStream(output));
--- End diff --

I think it should be `System.setOut(output)` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649117#comment-14649117
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/957#discussion_r35966826
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java 
---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+   private static LocalFlinkMiniCluster cluster;
+   private static Configuration config;
+   private static String hostPort;
+   private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+   private CliFrontend cli;
+   private OutputStream output;
+
+   @Before
+   public void setUp() throws Exception {
+   stream.reset();
+   output = System.out;
+   System.setOut(new PrintStream(stream));
+
+   config = new Configuration();
+   config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
localhost);
+   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+   
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
1);
+   
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false);
+   hostPort = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : +
+   
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+   try {
+   cluster = new LocalFlinkMiniCluster(config, false, 
StreamingMode.BATCH_ONLY);
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(Setup of test actor system failed.);
+   }
+
+   cli = new CliFrontend(getConfigDir());
+   }
+
+   @After
+   public void shutDownActorSystem() {
+   System.setOut(new PrintStream(output));
--- End diff --

I think it should be `System.setOut(output)` here.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649121#comment-14649121
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/957#discussion_r35967117
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java 
---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+   private static LocalFlinkMiniCluster cluster;
+   private static Configuration config;
+   private static String hostPort;
+   private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+   private CliFrontend cli;
+   private OutputStream output;
+
+   @Before
+   public void setUp() throws Exception {
+   stream.reset();
+   output = System.out;
+   System.setOut(new PrintStream(stream));
+
+   config = new Configuration();
+   config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
localhost);
+   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+   
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
1);
+   
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false);
+   hostPort = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : +
+   
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+   try {
+   cluster = new LocalFlinkMiniCluster(config, false, 
StreamingMode.BATCH_ONLY);
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(Setup of test actor system failed.);
+   }
+
+   cli = new CliFrontend(getConfigDir());
+   }
+
+   @After
+   public void shutDownActorSystem() {
+   System.setOut(new PrintStream(output));
--- End diff --

System.setOut only takes a PrintStream as argument. 


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649135#comment-14649135
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126669058
  
 There was no specific need for it. However, since the CliFrontend only 
passes the Client the configuration, I decided to include it with that.

There is a method on the Client to set the printing.

Further, I think while writing programs using flink [flink-example-esqe] 
and running them in IDE, it would be nice to have a config entry to turn off 
the log. It gets distracting sometimes.

If you want to achieve that, you can adjust the log4j.properties file and 
set logging to OFF.

I would be for excluding the config entry because I don't think people will 
use it.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/957#discussion_r35967492
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java 
---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+   private static LocalFlinkMiniCluster cluster;
+   private static Configuration config;
+   private static String hostPort;
+   private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+   private CliFrontend cli;
+   private OutputStream output;
+
+   @Before
+   public void setUp() throws Exception {
+   stream.reset();
+   output = System.out;
+   System.setOut(new PrintStream(stream));
+
+   config = new Configuration();
+   config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
localhost);
+   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+   
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
1);
+   
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false);
+   hostPort = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : +
+   
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+   try {
+   cluster = new LocalFlinkMiniCluster(config, false, 
StreamingMode.BATCH_ONLY);
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(Setup of test actor system failed.);
+   }
+
+   cli = new CliFrontend(getConfigDir());
+   }
+
+   @After
+   public void shutDownActorSystem() {
+   System.setOut(new PrintStream(output));
--- End diff --

Yes, and System.out returns a PrintStream :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649137#comment-14649137
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/957#discussion_r35967492
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java 
---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+   private static LocalFlinkMiniCluster cluster;
+   private static Configuration config;
+   private static String hostPort;
+   private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+   private CliFrontend cli;
+   private OutputStream output;
+
+   @Before
+   public void setUp() throws Exception {
+   stream.reset();
+   output = System.out;
+   System.setOut(new PrintStream(stream));
+
+   config = new Configuration();
+   config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
localhost);
+   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+   
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
1);
+   
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false);
+   hostPort = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : +
+   
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+   try {
+   cluster = new LocalFlinkMiniCluster(config, false, 
StreamingMode.BATCH_ONLY);
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(Setup of test actor system failed.);
+   }
+
+   cli = new CliFrontend(getConfigDir());
+   }
+
+   @After
+   public void shutDownActorSystem() {
+   System.setOut(new PrintStream(output));
--- End diff --

Yes, and System.out returns a PrintStream :)


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/957#discussion_r35967625
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java 
---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+   private static LocalFlinkMiniCluster cluster;
+   private static Configuration config;
+   private static String hostPort;
+   private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+   private CliFrontend cli;
+   private OutputStream output;
+
+   @Before
+   public void setUp() throws Exception {
+   stream.reset();
+   output = System.out;
+   System.setOut(new PrintStream(stream));
+
+   config = new Configuration();
+   config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
localhost);
+   config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+   
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
1);
+   
config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, 
false);
+   hostPort = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : +
+   
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+   try {
+   cluster = new LocalFlinkMiniCluster(config, false, 
StreamingMode.BATCH_ONLY);
+   }
+   catch (Exception e) {
+   e.printStackTrace();
+   fail(Setup of test actor system failed.);
+   }
+
+   cli = new CliFrontend(getConfigDir());
+   }
+
+   @After
+   public void shutDownActorSystem() {
+   System.setOut(new PrintStream(output));
--- End diff --

Ah. I just assumed it would be some kind of OutputStream. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2452) Add a playcount threshold to the MusicProfiles example

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14650010#comment-14650010
 ] 

ASF GitHub Bot commented on FLINK-2452:
---

GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/968

[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example

This PR adds a user-defined parameter to the MusicProfiles example that 
filters out songs that a user has listened to only a few times. Essentially, it 
is a threshold for playcount, above which a user is considered to like a song.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink music-profiles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/968.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #968


commit c0c8463521912d021c392c2c5edc254fee267eb8
Author: vasia va...@apache.org
Date:   2015-07-31T20:12:18Z

[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example




 Add a playcount threshold to the MusicProfiles example
 --

 Key: FLINK-2452
 URL: https://issues.apache.org/jira/browse/FLINK-2452
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 0.10
Reporter: Vasia Kalavri
Assignee: Vasia Kalavri
Priority: Minor

 In the MusicProfiles example, when creating the user-user similarity graph, 
 an edge is created between any 2 users that have listened to the same song 
 (even if once). Depending on the input data, this might produce a projection 
 graph with many more edges than the original user-song graph.
 To make this computation more efficient, this issue proposes adding a 
 user-defined parameter that filters out songs that a user has listened to 
 only a few times. Essentially, it is a threshold for playcount, above which a 
 user is considered to like a song.
 For reference, with a threshold value of 30, the whole Last.fm dataset is 
 analyzed on my laptop in a few minutes, while no threshold results in a 
 runtime of several hours.
 There are many solutions to this problem, but since this is just an example 
 (not a library method), I think that keeping it simple is important.
 Thanks to [~andralungu] for spotting the inefficiency!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2452] [Gelly] adds a playcount threshol...

2015-07-31 Thread vasia
GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/968

[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example

This PR adds a user-defined parameter to the MusicProfiles example that 
filters out songs that a user has listened to only a few times. Essentially, it 
is a threshold for playcount, above which a user is considered to like a song.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink music-profiles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/968.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #968


commit c0c8463521912d021c392c2c5edc254fee267eb8
Author: vasia va...@apache.org
Date:   2015-07-31T20:12:18Z

[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2412) Race leading to IndexOutOfBoundsException when querying for buffer while releasing SpillablePartition

2015-07-31 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-2412:
---
Summary: Race leading to IndexOutOfBoundsException when querying for buffer 
while releasing SpillablePartition  (was: Index Out of Bounds Exception)

 Race leading to IndexOutOfBoundsException when querying for buffer while 
 releasing SpillablePartition
 -

 Key: FLINK-2412
 URL: https://issues.apache.org/jira/browse/FLINK-2412
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.9, 0.10
Reporter: Andra Lungu
Assignee: Ufuk Celebi
Priority: Critical
 Fix For: 0.10, 0.9.1


 When running a code as simple as: 
 {noformat}
   ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
   DataSetEdgeString, NullValue edges = getEdgesDataSet(env);
   GraphString, NullValue, NullValue graph = 
 Graph.fromDataSet(edges, env);
   DataSetTuple2String, Long degrees = graph.getDegrees();
 degrees.writeAsCsv(outputPath, \n,  );
   env.execute();
 on the Freindster data set: 
 https://snap.stanford.edu/data/com-Friendster.html; on 30 Wally nodes
  
 I get the following exception:
 java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at 
 inDegrees(Graph.java:701))' , caused an error: Error obtaining the sorted 
 input: Thread 'SortMerger Reading Thread' terminated due to an exception: 
 Fatal error at remote task manager 
 'wally028.cit.tu-berlin.de/130.149.249.38:53730'.
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:722)
 Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
 Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal 
 error at remote task manager 'wally028.cit.tu-berlin.de/130.149.249.38:53730'.
   at 
 org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
   at 
 org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
   ... 3 more
 Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
 due to an exception: Fatal error at remote task manager 
 'wally028.cit.tu-berlin.de/130.149.249.38:53730'.
   at 
 org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784)
 Caused by: 
 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
 Fatal error at remote task manager 
 'wally028.cit.tu-berlin.de/130.149.249.38:53730'.
   at 
 org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:227)
   at 
 org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:162)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
   at 
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
   at 
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
   at 
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
   at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   

[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126612061
  
Thanks for adding the tests! This looks good to merge. Could you update 
https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2361) CompactingHashTable loses entries

2015-07-31 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-2361:
-
Summary: CompactingHashTable loses entries  (was: flatMap + distinct gives 
erroneous results for big data sets)

 CompactingHashTable loses entries
 -

 Key: FLINK-2361
 URL: https://issues.apache.org/jira/browse/FLINK-2361
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Affects Versions: 0.10
Reporter: Andra Lungu

 When running the simple Connected Components algorithm (currently in Gelly) 
 on the twitter follower graph, with 1, 100 or 1 iterations, I get the 
 following error:
 Caused by: java.lang.Exception: Target vertex '657282846' does not exist!.
   at 
 org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300)
   at 
 org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
   at 
 org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
   at 
 org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:722)
 Now this is very bizzare as the DataSet of vertices is produced from the 
 DataSet of edges... Which means there cannot be a an edge with an invalid 
 target id... The method calls flatMap to isolate the src and trg ids and 
 distinct to ensure their uniqueness.  
 The algorithm works fine for smaller data sets... 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648960#comment-14648960
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126614333
  
@mxm , updated.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126614333
  
@mxm , updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2361) CompactingHashTable loses entries

2015-07-31 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-2361:
-
Priority: Critical  (was: Major)

 CompactingHashTable loses entries
 -

 Key: FLINK-2361
 URL: https://issues.apache.org/jira/browse/FLINK-2361
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Affects Versions: 0.10
Reporter: Andra Lungu
Priority: Critical

 When running the simple Connected Components algorithm (currently in Gelly) 
 on the twitter follower graph, with 1, 100 or 1 iterations, I get the 
 following error:
 Caused by: java.lang.Exception: Target vertex '657282846' does not exist!.
   at 
 org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300)
   at 
 org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
   at 
 org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
   at 
 org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:722)
 Now this is very bizzare as the DataSet of vertices is produced from the 
 DataSet of edges... Which means there cannot be a an edge with an invalid 
 target id... The method calls flatMap to isolate the src and trg ids and 
 distinct to ensure their uniqueness.  
 The algorithm works fine for smaller data sets... 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server

2015-07-31 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi updated FLINK-2446:
--
Affects Version/s: (was: 0.8.1)
   0.10

 SocketTextStreamFunction has memory leak when reconnect server
 --

 Key: FLINK-2446
 URL: https://issues.apache.org/jira/browse/FLINK-2446
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2446]Fix SocketTextStreamFunction has m...

2015-07-31 Thread mbalassi
Github user mbalassi commented on the pull request:

https://github.com/apache/flink/pull/965#issuecomment-126619190
  
Good catch, if no objections I will merge this in the evening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648971#comment-14648971
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126620784
  
Thanks. One more thing. Could you also update 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html ? 
You introduced a new config entry.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...

2015-07-31 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/957#issuecomment-126620784
  
Thanks. One more thing. Could you also update 
https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html ? 
You introduced a new config entry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648977#comment-14648977
 ] 

ASF GitHub Bot commented on FLINK-2446:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/965#issuecomment-12669
  
Nice catch! Looks good to merge.


 SocketTextStreamFunction has memory leak when reconnect server
 --

 Key: FLINK-2446
 URL: https://issues.apache.org/jira/browse/FLINK-2446
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2166) Add fromCsvFile() to TableEnvironment

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648981#comment-14648981
 ] 

ASF GitHub Bot commented on FLINK-2166:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/939#issuecomment-126624411
  
Maybe we could move the TableEnvironment to org.apache.flink.api.table to 
make it clear that it can be used for both languages.


 Add fromCsvFile() to TableEnvironment
 -

 Key: FLINK-2166
 URL: https://issues.apache.org/jira/browse/FLINK-2166
 Project: Flink
  Issue Type: New Feature
  Components: Table API
Affects Versions: 0.9
Reporter: Fabian Hueske
Priority: Minor
  Labels: starter

 Add a {{fromCsvFile()}} method to the {{TableEnvironment}} to read a 
 {{Table}} from a CSV file.
 The implementation should reuse Flink's CsvInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648945#comment-14648945
 ] 

ASF GitHub Bot commented on FLINK-2248:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/957#discussion_r35956433
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java 
---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir;
+import static org.junit.Assert.fail;
+
+public class CliFrontendLoggingTest {
+
+   private static LocalFlinkMiniCluster cluster;
+   private static Configuration config;
+   private static String hostPort;
+   private ByteArrayOutputStream stream = new ByteArrayOutputStream();
+   private CliFrontend cli;
+
+   @Before
+   public void setUp() throws Exception {
+   stream.reset();
+   System.setOut(new PrintStream(stream));
--- End diff --

You are overwriting stdout permanently here. You could save `System.out` to 
a variable here and set it again in the `@After` method.


 Allow disabling of sdtout logging output
 

 Key: FLINK-2248
 URL: https://issues.apache.org/jira/browse/FLINK-2248
 Project: Flink
  Issue Type: Improvement
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: starter

 Currently when a job is submitted through the CLI we get in stdout all the 
 log output about each stage in the job.
 It would useful to have an easy way to disable this output when submitting 
 the job, as most of the time we are only interested in the log output if 
 something goes wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2450) IndexOutOfBoundsException in KryoSerializer

2015-07-31 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2450:
--

 Summary: IndexOutOfBoundsException in KryoSerializer
 Key: FLINK-2450
 URL: https://issues.apache.org/jira/browse/FLINK-2450
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
 Fix For: 0.10, 0.9.1


The following Exception has been reported by [~andralungu]:

{code}
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (CoGroup 
at groupReduceOnNeighbors(Graph.java:1405))' , caused an error: Error obtaining 
the sorted input: Thread 'SortMerger spilling thread' terminated due to an 
exception: Index: 53, Size: 0
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:476)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:576)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger spilling thread' terminated due to an exception: Index: 53, Size: 0
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)
at 
org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1096)
at 
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated 
due to an exception: Index: 53, Size: 0
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784)
Caused by: java.lang.IndexOutOfBoundsException: Index: 53, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:604)
at java.util.ArrayList.get(ArrayList.java:382)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:531)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328)
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
{code}

See FLINK-2412 for example program and data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2166) Add fromCsvFile() to TableEnvironment

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648973#comment-14648973
 ] 

ASF GitHub Bot commented on FLINK-2166:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/939#issuecomment-126621482
  
Just my opinion, `TableEnvironment` is located under 
`org.apache.flink.api.java.table` because of unifying of Table API 
implementation. But Table API is implemented on Scala. I think that using Scala 
API is proper for this.

@aljoscha @fhueske How do you think about this?


 Add fromCsvFile() to TableEnvironment
 -

 Key: FLINK-2166
 URL: https://issues.apache.org/jira/browse/FLINK-2166
 Project: Flink
  Issue Type: New Feature
  Components: Table API
Affects Versions: 0.9
Reporter: Fabian Hueske
Priority: Minor
  Labels: starter

 Add a {{fromCsvFile()}} method to the {{TableEnvironment}} to read a 
 {{Table}} from a CSV file.
 The implementation should reuse Flink's CsvInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2166. Add fromCsvFile() method to TableE...

2015-07-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/939#issuecomment-126621482
  
Just my opinion, `TableEnvironment` is located under 
`org.apache.flink.api.java.table` because of unifying of Table API 
implementation. But Table API is implemented on Scala. I think that using Scala 
API is proper for this.

@aljoscha @fhueske How do you think about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-2166. Add fromCsvFile() method to TableE...

2015-07-31 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/939#issuecomment-126624411
  
Maybe we could move the TableEnvironment to org.apache.flink.api.table to 
make it clear that it can be used for both languages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

2015-07-31 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648906#comment-14648906
 ] 

Fabian Hueske commented on FLINK-2447:
--

Thanks for the detailed report!

[~twalthr], has done a lot in the TypeExtractor. Maybe he can comment on this 
problem.

 TypeExtractor returns wrong type info when a Tuple has two fields of the same 
 POJO type
 ---

 Key: FLINK-2447
 URL: https://issues.apache.org/jira/browse/FLINK-2447
 Project: Flink
  Issue Type: Bug
  Components: Java API
Reporter: Gabor Gevay

 Consider the following code:
 DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo());
   DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new 
 MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() {
   @Override
   public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo 
 value) throws Exception {
   return null;
   }
   });
 where FooBarPojo is the following type:
 public class FooBarPojo {
   public int foo, bar;
   public FooBarPojo() {}
 }
 This should print a tuple type with two identical fields:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer]
 But it prints the following instead:
 Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], 
 GenericTypeFooBarPojo
 Note, that this problem causes some co-groups in Gelly to crash with 
 org.apache.flink.api.common.InvalidProgramException: The pair of co-group 
 keys are not compatible with each other when the vertex ID type is a POJO, 
 because the second field of the Edge type gets to be a generic type, but the 
 POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns 
 different numbers for the POJO and the generic type.
 The source of the problem is the mechanism in TypeExtractor that would detect 
 recursive types (see the alreadySeen field in TypeExtractor), as it 
 mistakes the second appearance of FooBarPojo with a recursive field.
 Specifically the following happens: createTypeInfoWithTypeHierarchy starts to 
 process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls 
 itself for the first field, which proceeds into the privateGetForClass case 
 which correctly detects that it is a POJO, and correctly returns a 
 PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds 
 PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy 
 approaches the second field, goes into privateGetForClass, which mistakenly 
 returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type 
 is being processed.
 (Note, that if we comment out the recursive type detection (the lines that do 
 their thing with the alreadySeen field), then the output is correct.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-07-31 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-2448:
---

 Summary: registerCacheFile fails with MultipleProgramsTestbase
 Key: FLINK-2448
 URL: https://issues.apache.org/jira/browse/FLINK-2448
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Chesnay Schepler
Priority: Minor


When trying to register a file using a constant name an expection is thrown 
saying the file was already cached.

This is probably because the same environment is reused, and the cacheFile 
entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >