[jira] [Updated] (FLINK-1994) Add different gain calculation schemes to SGD
[ https://issues.apache.org/jira/browse/FLINK-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-1994: - Assignee: Trevor Grant Add different gain calculation schemes to SGD - Key: FLINK-1994 URL: https://issues.apache.org/jira/browse/FLINK-1994 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Trevor Grant Priority: Minor Labels: ML, Starter The current SGD implementation uses as gain for the weight updates the formula {{stepsize/sqrt(iterationNumber)}}. It would be good to make the gain calculation configurable and to provide different strategies for that. For example: * stepsize/(1 + iterationNumber) * stepsize*(1 + regularization * stepsize * iterationNumber)^(-3/4) See also how to properly select the gains [1]. Resources: [1] http://arxiv.org/pdf/1107.2490.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1994) Add different gain calculation schemes to SGD
[ https://issues.apache.org/jira/browse/FLINK-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649172#comment-14649172 ] Till Rohrmann commented on FLINK-1994: -- Great to hear [~rawkintrevo]. I assigned the issue to you. Add different gain calculation schemes to SGD - Key: FLINK-1994 URL: https://issues.apache.org/jira/browse/FLINK-1994 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Trevor Grant Priority: Minor Labels: ML, Starter The current SGD implementation uses as gain for the weight updates the formula {{stepsize/sqrt(iterationNumber)}}. It would be good to make the gain calculation configurable and to provide different strategies for that. For example: * stepsize/(1 + iterationNumber) * stepsize*(1 + regularization * stepsize * iterationNumber)^(-3/4) See also how to properly select the gains [1]. Resources: [1] http://arxiv.org/pdf/1107.2490.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/957 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-2248: -- Assignee: Sachin Goel Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Assignee: Sachin Goel Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-126705428 Thanks for the pull request! You did not convert all InputFormats and OutputFormats. There are a few more, e.g. HadoopInputFormatBase or JDBCInputFormat. Other than that, your pull request looks good. I think it is fine to keep the notion of normal and rich, just like we do for the operator user functions. That also means that we do not break the existing interfaces. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649156#comment-14649156 ] ASF GitHub Bot commented on FLINK-2248: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126674684 Thank you for your patience :) I will merge this once the tests have passed. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126674684 Thank you for your patience :) I will merge this once the tests have passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-126706999 Ah yes. I'll update them in a while. There's actually some problem with the unit test I've written too. Travis fails sporadically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649271#comment-14649271 ] ASF GitHub Bot commented on FLINK-1819: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/966#issuecomment-126706999 Ah yes. I'll update them in a while. There's actually some problem with the unit test I've written too. Travis fails sporadically. Allow access to RuntimeContext from Input and OutputFormats --- Key: FLINK-1819 URL: https://issues.apache.org/jira/browse/FLINK-1819 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.9, 0.8.1 Reporter: Fabian Hueske Priority: Minor Fix For: 0.9 User function that extend a RichFunction can access a {{RuntimeContext}} which gives the parallel id of the task and access to Accumulators and BroadcastVariables. Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649146#comment-14649146 ] ASF GitHub Bot commented on FLINK-2248: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126672392 Yeah. I think we should remove it then. Too many entries tend to confuse people. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126672392 Yeah. I think we should remove it then. Too many entries tend to confuse people. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1819][core]Allow access to RuntimeConte...
GitHub user sachingoel0101 opened a pull request: https://github.com/apache/flink/pull/966 [FLINK-1819][core]Allow access to RuntimeContext from Input and Output formats 1. Introduces new Rich Input and Output formats, similar to Rich Functions. 2. Makes all existing input and output formats rich, without any API breaking changes. 3. Provides RuntimeContext to GenericSink and GenericSource operators 4. DataSourceTask and DataSinkTask set DistributedUDFContext to their input and output formats respectively. 5. Complete access to RuntimeContext, including access to accumulators. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sachingoel0101/flink flink-1819 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #966 commit 627ee290fb88ac3e670fe86a734116b06f431ade Author: Sachin Goel sachingoel0...@gmail.com Date: 2015-07-31T04:36:20Z [FLINK-1819][core]Allow access to RuntimeContext from Input and Output formats --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649165#comment-14649165 ] ASF GitHub Bot commented on FLINK-2248: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126675074 Sure. No problem. :) Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649185#comment-14649185 ] ASF GitHub Bot commented on FLINK-1819: --- GitHub user sachingoel0101 opened a pull request: https://github.com/apache/flink/pull/966 [FLINK-1819][core]Allow access to RuntimeContext from Input and Output formats 1. Introduces new Rich Input and Output formats, similar to Rich Functions. 2. Makes all existing input and output formats rich, without any API breaking changes. 3. Provides RuntimeContext to GenericSink and GenericSource operators 4. DataSourceTask and DataSinkTask set DistributedUDFContext to their input and output formats respectively. 5. Complete access to RuntimeContext, including access to accumulators. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sachingoel0101/flink flink-1819 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #966 commit 627ee290fb88ac3e670fe86a734116b06f431ade Author: Sachin Goel sachingoel0...@gmail.com Date: 2015-07-31T04:36:20Z [FLINK-1819][core]Allow access to RuntimeContext from Input and Output formats Allow access to RuntimeContext from Input and OutputFormats --- Key: FLINK-1819 URL: https://issues.apache.org/jira/browse/FLINK-1819 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.9, 0.8.1 Reporter: Fabian Hueske Priority: Minor Fix For: 0.9 User function that extend a RichFunction can access a {{RuntimeContext}} which gives the parallel id of the task and access to Accumulators and BroadcastVariables. Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126672811 I've updated the code to remove any changes in Configuration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649147#comment-14649147 ] ASF GitHub Bot commented on FLINK-2248: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126672811 I've updated the code to remove any changes in Configuration. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2409] [webserver] Replaces ActorRefs wi...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/959#issuecomment-126698555 Travis failed only in one build where the `YARNSessionFIFOITCase` did not start. I suspect that it's due to the instability of the `YARNSessionFIFOITCase` test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2409) Old JM web interface is sending cancel messages w/o leader ID
[ https://issues.apache.org/jira/browse/FLINK-2409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649234#comment-14649234 ] ASF GitHub Bot commented on FLINK-2409: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/959#issuecomment-126698555 Travis failed only in one build where the `YARNSessionFIFOITCase` did not start. I suspect that it's due to the instability of the `YARNSessionFIFOITCase` test. Old JM web interface is sending cancel messages w/o leader ID - Key: FLINK-2409 URL: https://issues.apache.org/jira/browse/FLINK-2409 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 0.10 Reporter: Robert Metzger Assignee: Till Rohrmann {code} 12:29:41,877 ERROR akka.actor.OneForOneStrategy - Received a message CancelJob(4b3631741c344881362ea46e29980ce4) without a leader session ID, even though it requires to have one. java.lang.Exception: Received a message CancelJob(4b3631741c344881362ea46e29980ce4) without a leader session ID, even though it requires to have one. at org.apache.flink.runtime.LeaderSessionMessages$$anonfun$receive$1.applyOrElse(LeaderSessionMessages.scala:49) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:101) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 12:29:41,879 INFO org.apache.flink.runtime.jobmanager.JobManager - Stopping JobManager akka://flink/user/jobmanager#-638215033. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126675074 Sure. No problem. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126695988 Travis build passes. You can merge it @mxm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649214#comment-14649214 ] ASF GitHub Bot commented on FLINK-2248: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126695988 Travis build passes. You can merge it @mxm Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1819) Allow access to RuntimeContext from Input and OutputFormats
[ https://issues.apache.org/jira/browse/FLINK-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649269#comment-14649269 ] ASF GitHub Bot commented on FLINK-1819: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/966#discussion_r35977130 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java --- @@ -75,6 +78,13 @@ // cancel flag private volatile boolean taskCanceled = false; + /** +* The accumulator map used in the RuntimeContext. +*/ + protected MapString, Accumulator?,? accumulatorMap; --- End diff -- This field should be final. Or you get rid of the field and just access the accumulators via `getEnvironment().getAccumulatorRegistry().getUserMap()`. Allow access to RuntimeContext from Input and OutputFormats --- Key: FLINK-1819 URL: https://issues.apache.org/jira/browse/FLINK-1819 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.9, 0.8.1 Reporter: Fabian Hueske Priority: Minor Fix For: 0.9 User function that extend a RichFunction can access a {{RuntimeContext}} which gives the parallel id of the task and access to Accumulators and BroadcastVariables. Right now, Input and OutputFormats cannot access their {{RuntimeContext}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984232 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample without replacement, each element sample choice is just a bernoulli trail. + * + * @param T The type of sample. + */ +public class BernoulliSamplerT extends RandomSamplerT { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +*/ + public BernoulliSampler(double fraction) { + this(fraction, new Random()); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator seed. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param seed random number generator seed. +*/ + public BernoulliSampler(double fraction, long seed) { + this(fraction, new Random(seed)); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param random the random number generator. +*/ + public BernoulliSampler(double fraction, Random random) { + Preconditions.checkArgument(fraction = 0 fraction = 1.0d, fraction fraction must between [0, 1].); + this.fraction = fraction; + this.random = random; + } + + /** +* Sample the input elements, for each input element, take a Bernoulli Trail for sample. +* +* @param input elements to be sampled. +* @return the sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T current; + + @Override + public boolean hasNext() { + if (current == null) { + while (input.hasNext()) { + T element = input.next(); + if (random.nextDouble() = fraction) { + current = element; + return true; + } + } + current = null; + return false; + } + return false; --- End diff -- I think, if I'm not mistaken, that `hasNext` has to be idempotent. Thus it should return `true` if `current != null`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984950 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import java.util.Iterator; + +public abstract class RandomSamplerT { + + protected final IteratorT EMPTY_ITERABLE = new SampledIteratorT() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public T next() { + return null; + } + }; + + /** +* Randomly sample the elements from input, and return the result iterator. +* +* @param input source data +* @return the sample result. +*/ + public abstract IteratorT sample(IteratorT input); +} + +abstract class SampledIteratorT implements IteratorT { --- End diff -- JavaDocs are missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests
[ https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649394#comment-14649394 ] Sachin Goel commented on FLINK-2449: Or rather override it in your test class to return true. DistCache doesn't work with JavaProgram Collection tests Key: FLINK-2449 URL: https://issues.apache.org/jira/browse/FLINK-2449 Project: Flink Issue Type: Bug Components: Tests Reporter: Chesnay Schepler Priority: Minor When attempting to retrieve a file during the Collection test in JavaProgramTestBase you'll encounter an exception claiming the file was never registered. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2451) Cleanup Gelly examples
Vasia Kalavri created FLINK-2451: Summary: Cleanup Gelly examples Key: FLINK-2451 URL: https://issues.apache.org/jira/browse/FLINK-2451 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.10 Reporter: Vasia Kalavri Assignee: Vasia Kalavri Priority: Minor As per discussion in the dev@ mailing list, this issue proposes the following changes to the Gelly examples and library: 1. Keep the following examples as they are: EuclideanGraphWeighing, GraphMetrics, IncrementalSSSP, JaccardSimilarity, MusicProfiles. 2. Keep only 1 example to show how to use library methods. 3. Add 1 example for vertex-centric iterations. 4. Keep 1 example for GSA iterations and move the redundant GSA implementations to the library. 5. Improve the examples documentation and refer to the functionality that each of them demonstrates. 6. Port and modify existing example tests accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991461 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -893,13 +1086,11 @@ public Object call() throws Exception { } }, executionContext); break; - } - else if (numberOfRetriesLeft = 0 transitionState(current, JobStatus.FAILED, failureCause)) { + } else if (numberOfRetriesLeft = 0 transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } - } - else { + } else { --- End diff -- You're touching a lot of code here (and above) for no other reason than reformatting. We have no style guide yet that enforces this formatting. In general, we try not to touch existing lines to keep the latest history accurate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991960 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -560,53 +621,177 @@ public ExecutionContext getExecutionContext() { } /** +* This works as cache for already merged accumulators, as, in some cases, +* we do not want to remerge accumulators as this may lead to duplicate entries. +*/ + private MapString, Accumulator?, ? mergedSmallUserAccumulators; --- End diff -- Why can it lead to duplicate entries? The accumulators should always be merged from scratch. Every accumulator update message contains the current state of the accumulators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984921 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import java.util.Iterator; + +public abstract class RandomSamplerT { --- End diff -- JavaDocs are missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649351#comment-14649351 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984921 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import java.util.Iterator; + +public abstract class RandomSamplerT { --- End diff -- JavaDocs are missing Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-126740651 Thanks for your contribution @ChengXiangLi. The code is really well tested and well structured. Great work :-) I had only some minor comments. There is however one thing I'm not so sure about. With the current implementation, all parallel tasks of the sampling operator will get the same random generator/seed value. Thus, every node will generate the same sequence of random numbers. I think this can have a negative influence on the sampling. What we could do is to use `RichMapPartitionFunction` instead of the `MapPartitionFunction`. With the rich function, we either have access to the subtask index, given by `getRuntimeContext().getIndexOfThisSubtask()`, which we could use to modify the initial seed or we generate the random number generator in the `open` method (this method is executed on the TaskManager). Assuming that the clocks are not completely synchronized and that the individual tasks will be instantiated not at the same time, this could give us less correlated random number sequences. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649332#comment-14649332 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984232 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample without replacement, each element sample choice is just a bernoulli trail. + * + * @param T The type of sample. + */ +public class BernoulliSamplerT extends RandomSamplerT { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +*/ + public BernoulliSampler(double fraction) { + this(fraction, new Random()); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator seed. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param seed random number generator seed. +*/ + public BernoulliSampler(double fraction, long seed) { + this(fraction, new Random(seed)); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param random the random number generator. +*/ + public BernoulliSampler(double fraction, Random random) { + Preconditions.checkArgument(fraction = 0 fraction = 1.0d, fraction fraction must between [0, 1].); + this.fraction = fraction; + this.random = random; + } + + /** +* Sample the input elements, for each input element, take a Bernoulli Trail for sample. +* +* @param input elements to be sampled. +* @return the sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T current; + + @Override + public boolean hasNext() { + if (current == null) { + while (input.hasNext()) { + T element = input.next(); + if (random.nextDouble() = fraction) { + current = element; + return true; + } + } + current = null; + return false; + } + return false; --- End diff -- I think, if I'm not mistaken, that `hasNext` has to be idempotent. Thus it should return `true` if `current != null`. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components:
[GitHub] flink pull request: Stale Synchronous Parallel Iterations
GitHub user nltran opened a pull request: https://github.com/apache/flink/pull/967 Stale Synchronous Parallel Iterations Here is a pull request containing our development on [Stale Synchronous Parallel (SSP)](http://reports-archive.adm.cs.cmu.edu/anon/ml2013/CMU-ML-13-103.pdf) iterations. The pull request contains: * Code supporting SSP * API to configure and enable SSP from the ExecutionEnvironment You can merge this pull request into a Git repository by running: $ git pull https://github.com/nltran/flink SSP Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/967.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #967 commit 0435bb9114f48aeefb330e7cf92610be84f79c81 Author: Nam-Luc Tran namluc.t...@euranova.eu Date: 2015-07-31T15:21:08Z * Added the model for parameter server and parameter element * Added parameter server implementation based on Apache Ignite * Started instance of parameter server in TaskManager * Added Apache Ignite as dependency in pom.xml commit decc66db856d3d640e7e29128e2696c0941091dd Author: Nam-Luc Tran namluc.t...@euranova.eu Date: 2015-07-31T15:24:32Z * Extended RichMapFunction with methods to access parameter server commit 55398da553ce697af9bea881ea7131818edb82d2 Author: Nam-Luc Tran namluc.t...@euranova.eu Date: 2015-07-31T15:25:08Z * Extended DataSet API to enable SSP and configuration commit 6ef80ceac5e6ef897769e1883823d152d0c04070 Author: Nam-Luc Tran namluc.t...@euranova.eu Date: 2015-07-31T15:25:55Z * Extended ExecutionEnvironment and ExecutionConfig to enable SSP and SPP configuration commit 368ca1c101034aefa8b6ac0ce4791133976831e0 Author: Nam-Luc Tran namluc.t...@euranova.eu Date: 2015-07-31T15:29:29Z * Added drop-in control structures for Stale Synchronous Parallel iterations commit 23fb6518fe7042ad859bb9474266131f2bdb669c Author: Nam-Luc Tran namluc.t...@euranova.eu Date: 2015-07-31T15:30:31Z * Added the events used by the control structures for Stale Synchronous Parallel iterations commit 48810bde7783610112751f2221126b69a1ac9b56 Author: Nam-Luc Tran namluc.t...@euranova.eu Date: 2015-07-31T15:32:02Z * Extended the job translation to take into account the control structures related to Stale Synchronous Parallel Iterations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35988629 --- Diff: pom.xml --- @@ -224,6 +224,12 @@ under the License. version3.2.1/version /dependency + dependency + groupIdorg.apache.commons/groupId + artifactIdcommons-math3/artifactId + version3.5/version + /dependency --- End diff -- For that we have to add an entry in the `flink-dist/NOTICE` and `flink-dist/LICENSE` files. But I can do that when merging the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649419#comment-14649419 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35988629 --- Diff: pom.xml --- @@ -224,6 +224,12 @@ under the License. version3.2.1/version /dependency + dependency + groupIdorg.apache.commons/groupId + artifactIdcommons-math3/artifactId + version3.5/version + /dependency --- End diff -- For that we have to add an entry in the `flink-dist/NOTICE` and `flink-dist/LICENSE` files. But I can do that when merging the PR. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server
[ https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649385#comment-14649385 ] ASF GitHub Bot commented on FLINK-2446: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/965 SocketTextStreamFunction has memory leak when reconnect server -- Key: FLINK-2446 URL: https://issues.apache.org/jira/browse/FLINK-2446 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649341#comment-14649341 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984449 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sample elements with replacement, + * the picked number of each element follow poisson distribution, so we could use poisson distribution --- End diff -- Typo: element follows a given poisson distribution Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649337#comment-14649337 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984385 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sample elements with replacement, --- End diff -- While sampling elements Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985033 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling with replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the algorithm described in Reservoir-based Random Sampling with Replacement + * from Data Stream. --- End diff -- Maybe put a link to the paper in here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985704 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithoutReplacement.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling without replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the Algorithm R described in Random Sampling with a Reservoir Vitter, 1985. --- End diff -- Maybe add again a link to the paper if available. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649372#comment-14649372 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985704 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithoutReplacement.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling without replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the Algorithm R described in Random Sampling with a Reservoir Vitter, 1985. --- End diff -- Maybe add again a link to the paper if available. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server
[ https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi closed FLINK-2446. - Resolution: Fixed Fix Version/s: 0.10 Fixed via b0f2379. SocketTextStreamFunction has memory leak when reconnect server -- Key: FLINK-2446 URL: https://issues.apache.org/jira/browse/FLINK-2446 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: fangfengbin Priority: Minor Fix For: 0.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2436] [streaming] Make ByteStreamStateH...
Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/958#issuecomment-126733653 If you could take a quick look @uce, I would like to merge this :) I think it is a safe change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2436) Make ByteStreamStateHandles more robust
[ https://issues.apache.org/jira/browse/FLINK-2436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649398#comment-14649398 ] ASF GitHub Bot commented on FLINK-2436: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/958#issuecomment-126733653 If you could take a quick look @uce, I would like to merge this :) I think it is a safe change. Make ByteStreamStateHandles more robust --- Key: FLINK-2436 URL: https://issues.apache.org/jira/browse/FLINK-2436 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Priority: Critical The ByteStreamStateHandles are currently implemented in a very specific way that makes assumptions of their actual usage. Also there is a possible resource leak when errors occur during writing or reading from the stream. This affects FileStateHandles as well of course. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2452) Add a playcount threshold to the MusicProfiles example
[ https://issues.apache.org/jira/browse/FLINK-2452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-2452: - Summary: Add a playcount threshold to the MusicProfiles example (was: Add a playcount threshold to the MusicProfiles examples) Add a playcount threshold to the MusicProfiles example -- Key: FLINK-2452 URL: https://issues.apache.org/jira/browse/FLINK-2452 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.10 Reporter: Vasia Kalavri Assignee: Vasia Kalavri Priority: Minor In the MusicProfiles example, when creating the user-user similarity graph, an edge is created between any 2 users that have listened to the same song (even if once). Depending on the input data, this might produce a projection graph with many more edges than the original user-song graph. To make this computation more efficient, this issue proposes adding a user-defined parameter that filters out songs that a user has listened to only a few times. Essentially, it is a threshold for playcount, above which a user is considered to like a song. For reference, with a threshold value of 30, the whole Last.fm dataset is analyzed on my laptop in a few minutes, while no threshold results in a runtime of several hours. There are many solutions to this problem, but since this is just an example (not a library method), I think that keeping it simple is important. Thanks to [~andralungu] for spotting the inefficiency! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests
[ https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649444#comment-14649444 ] Chesnay Schepler commented on FLINK-2449: - fair enough, scratch the part about skipping the test. DistCache doesn't work with JavaProgram Collection tests Key: FLINK-2449 URL: https://issues.apache.org/jira/browse/FLINK-2449 Project: Flink Issue Type: Bug Components: Tests Reporter: Chesnay Schepler Priority: Minor When attempting to retrieve a file during the Collection test in JavaProgramTestBase you'll encounter an exception claiming the file was never registered. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991476 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -979,8 +1174,7 @@ public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { if (execution == null) { fail(new IllegalStateException(Cannot find execution for execution ID + partitionId.getPartitionId())); - } - else if (execution.getVertex() == null){ + } else if (execution.getVertex() == null) { --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -918,8 +1109,7 @@ private void postRunCleanup() { if (coord != null) { coord.shutdown(); } - } - catch (Exception e) { + } catch (Exception e) { --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -966,8 +1162,7 @@ public boolean updateState(TaskExecutionState state) { attempt.fail(new Exception(TaskManager sent illegal state update: + state.getExecutionState())); return false; } - } - else { + } else { --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649358#comment-14649358 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985033 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling with replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the algorithm described in Reservoir-based Random Sampling with Replacement + * from Data Stream. --- End diff -- Maybe put a link to the paper in here. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985116 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling with replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the algorithm described in Reservoir-based Random Sampling with Replacement + * from Data Stream. + * + * @param T the type of sample. + */ +public class ReservoirSamplerWithReplacementT extends RandomSamplerT { + private final int numSamples; + private final Random random; + private PoissonDistribution poissonDistribution; + private ListInteger positions; + + /** +* Create a reservoir sampler with fixed sample size and default random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +*/ + public ReservoirSamplerWithReplacement(int numSamples) { + this(numSamples, new Random()); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator seed. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +* @param seed random number generator seed +*/ + public ReservoirSamplerWithReplacement(int numSamples, long seed) { + this(numSamples, new Random(seed)); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. --- End diff -- @param is missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649360#comment-14649360 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985116 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling with replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the algorithm described in Reservoir-based Random Sampling with Replacement + * from Data Stream. + * + * @param T the type of sample. + */ +public class ReservoirSamplerWithReplacementT extends RandomSamplerT { + private final int numSamples; + private final Random random; + private PoissonDistribution poissonDistribution; + private ListInteger positions; + + /** +* Create a reservoir sampler with fixed sample size and default random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +*/ + public ReservoirSamplerWithReplacement(int numSamples) { + this(numSamples, new Random()); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator seed. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +* @param seed random number generator seed +*/ + public ReservoirSamplerWithReplacement(int numSamples, long seed) { + this(numSamples, new Random(seed)); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. --- End diff -- @param is missing Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests
[ https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649392#comment-14649392 ] Sachin Goel commented on FLINK-2449: You can call {{skipCollectionExecution}} :) DistCache doesn't work with JavaProgram Collection tests Key: FLINK-2449 URL: https://issues.apache.org/jira/browse/FLINK-2449 Project: Flink Issue Type: Bug Components: Tests Reporter: Chesnay Schepler Priority: Minor When attempting to retrieve a file during the Collection test in JavaProgramTestBase you'll encounter an exception claiming the file was never registered. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649429#comment-14649429 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-126740651 Thanks for your contribution @ChengXiangLi. The code is really well tested and well structured. Great work :-) I had only some minor comments. There is however one thing I'm not so sure about. With the current implementation, all parallel tasks of the sampling operator will get the same random generator/seed value. Thus, every node will generate the same sequence of random numbers. I think this can have a negative influence on the sampling. What we could do is to use `RichMapPartitionFunction` instead of the `MapPartitionFunction`. With the rich function, we either have access to the subtask index, given by `getRuntimeContext().getIndexOfThisSubtask()`, which we could use to modify the initial seed or we generate the random number generator in the `open` method (this method is executed on the TaskManager). Assuming that the clocks are not completely synchronized and that the individual tasks will be instantiated not at the same time, this could give us less correlated random number sequences. What do you think? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991266 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -75,24 +78,24 @@ * The execution graph is the central data structure that coordinates the distributed * execution of a data flow. It keeps representations of each parallel task, each * intermediate result, and the communication between them. - * + * p/ * The execution graph consists of the following constructs: * ul - * liThe {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like - * map or join) during execution. It holds the aggregated state of all parallel subtasks. - * The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes - * from the JobGraph's corresponding JobVertex./li - * liThe {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are - * as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by - * the ExecutionJobVertex and the number of the parallel subtask/li - * liThe {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions - * for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed - * because it is no longer available when requested by later operations. An Execution is always - * identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager - * about deployment of tasks and updates in the task status always use the ExecutionAttemptID to - * address the message receiver./li + * liThe {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like + * map or join) during execution. It holds the aggregated state of all parallel subtasks. + * The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes + * from the JobGraph's corresponding JobVertex./li + * liThe {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are + * as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by + * the ExecutionJobVertex and the number of the parallel subtask/li + * liThe {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions + * for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed + * because it is no longer available when requested by later operations. An Execution is always + * identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager + * about deployment of tasks and updates in the task status always use the ExecutionAttemptID to + * address the message receiver./li --- End diff -- This change seems unnecessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991305 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -560,53 +621,177 @@ public ExecutionContext getExecutionContext() { } /** +* This works as cache for already merged accumulators, as, in some cases, +* we do not want to remerge accumulators as this may lead to duplicate entries. +*/ + private MapString, Accumulator?, ? mergedSmallUserAccumulators; + + /** * Merges all accumulator results from the tasks previously executed in the Executions. +* * @return The accumulator map */ - public MapString, Accumulator?,? aggregateUserAccumulators() { + public MapString, Accumulator?, ? aggregateSmallUserAccumulators() { + return aggregateSmallUserAccumulators(true); + } - MapString, Accumulator?, ? userAccumulators = new HashMapString, Accumulator?, ?(); + /** +* Merges all accumulator results from the tasks previously executed in the Executions. +* If codereaggregate/code is set to false, then no aggregation is performed, and +* the cache merge result is returned. Otherwise accumulators are merged. +* +* @param reaggregate codetrue/code if we want to aggregate accumulators, +*codefalse/code otherwise. +* @return The accumulator map +*/ + public MapString, Accumulator?, ? aggregateSmallUserAccumulators(boolean reaggregate) { + if (!reaggregate) { + return mergedSmallUserAccumulators; + } + this.mergedSmallUserAccumulators = new HashMapString, Accumulator?, ?(); for (ExecutionVertex vertex : getAllExecutionVertices()) { - MapString, Accumulator?, ? next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); + MapString, Accumulator?, ? next = vertex.getCurrentExecutionAttempt().getSmallUserAccumulators(); if (next != null) { - AccumulatorHelper.mergeInto(userAccumulators, next); + AccumulatorHelper.mergeInto(mergedSmallUserAccumulators, next); } } + return mergedSmallUserAccumulators; + } - return userAccumulators; + /** +* Merges all blobKeys referring to blobs of large accumulators. These refer to blobs in the +* blobCache holding accumulators (results of tasks) that did not fit in an akka frame, +* thus had to be sent through the BlobCache. +* +* @return The accumulator map +*/ + public MapString, ListBlobKey aggregateLargeUserAccumulatorBlobKeys() { + MapString, ListBlobKey largeUserAccumulatorRefs = new HashMapString, ListBlobKey(); + + for (ExecutionVertex vertex : getAllExecutionVertices()) { + MapString, ListBlobKey next = vertex.getCurrentExecutionAttempt().getLargeUserAccumulatorBlobKeys(); + mergeLargeUserAccumulatorBlobKeys(largeUserAccumulatorRefs, next); + } + return largeUserAccumulatorRefs; } /** -* Gets a serialized accumulator map. +* Adds new blobKeys referring to blobs of large accumulators to the already existing ones. +* These refer to blobs in the blobCache holding accumulators (results of tasks) that did not +* fit in an akka frame, thus had to be sent through the BlobCache. +* +* @param target the initial blobKey map +* @param toMerge the new keys to add to the initial map +* @return The resulting accumulator map +*/ + public MapString, ListBlobKey addLargeUserAccumulatorBlobKeys( + MapString, ListBlobKey target, MapString, ListBlobKey toMerge) { + if (target == null) { + target = new HashMapString, ListBlobKey(); + } + mergeLargeUserAccumulatorBlobKeys(target, toMerge); + return target; + } + + private void mergeLargeUserAccumulatorBlobKeys( + MapString, ListBlobKey target, MapString, ListBlobKey toMerge) { + if (toMerge == null || toMerge.isEmpty()) { + return; + } + + for (Map.EntryString, ListBlobKey otherEntry : toMerge.entrySet()) { + ListBlobKey existing = target.get(otherEntry.getKey()); + if (existing == null) { + target.put(otherEntry.getKey(), otherEntry.getValue()); + } else
[GitHub] flink pull request: Framesize fix
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/934#discussion_r35991400 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -619,6 +718,21 @@ public ExecutionContext getExecutionContext() { resultStrings[i++] = result; } + for (Map.EntryString, ListBlobKey entry : largeAccumulatorMap.entrySet()) { + + if(!smallAccumulatorMap.containsKey(entry.getKey())) { + StringBuilder str = new StringBuilder(); + str.append(BlobKeys=[ ); + for (BlobKey bk : entry.getValue()) { + str.append(bk + ); + } + str.append(]); + + StringifiedAccumulatorResult result = + new StringifiedAccumulatorResult(entry.getKey(), Blob/Serialized, str.toString()); + resultStrings[i++] = result; + } + } --- End diff -- Ah ok. You're running into the same problem with the framesize here again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests
[ https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649315#comment-14649315 ] Sachin Goel commented on FLINK-2449: Collection Executor runs locally without any JobManager or TaskManager. There would be no need of a DistributedCache for this. You could just access any file locally. DistCache doesn't work with JavaProgram Collection tests Key: FLINK-2449 URL: https://issues.apache.org/jira/browse/FLINK-2449 Project: Flink Issue Type: Bug Components: Tests Reporter: Chesnay Schepler Priority: Minor When attempting to retrieve a file during the Collection test in JavaProgramTestBase you'll encounter an exception claiming the file was never registered. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2449) DistCache doesn't work with JavaProgram Collection tests
[ https://issues.apache.org/jira/browse/FLINK-2449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649323#comment-14649323 ] Chesnay Schepler commented on FLINK-2449: - I know i can. But this means that you can not write not a single test extending the JavaProgramTestBase that uses the DistCache without hacking around to skip the CollectionTests, like i currently have to with the Python API. The exception should be thrown way earlier, when registering the file opposed to retrieving them. I'd also argue that there should be an easy way to skip the Collection Test if it doesn't support all features. DistCache doesn't work with JavaProgram Collection tests Key: FLINK-2449 URL: https://issues.apache.org/jira/browse/FLINK-2449 Project: Flink Issue Type: Bug Components: Tests Reporter: Chesnay Schepler Priority: Minor When attempting to retrieve a file during the Collection test in JavaProgramTestBase you'll encounter an exception claiming the file was never registered. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984385 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sample elements with replacement, --- End diff -- While sampling elements --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984449 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sample elements with replacement, + * the picked number of each element follow poisson distribution, so we could use poisson distribution --- End diff -- Typo: element follows a given poisson distribution --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1901] [core] Create sample operator for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984320 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample without replacement, each element sample choice is just a bernoulli trail. + * + * @param T The type of sample. + */ +public class BernoulliSamplerT extends RandomSamplerT { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +*/ + public BernoulliSampler(double fraction) { + this(fraction, new Random()); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator seed. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param seed random number generator seed. +*/ + public BernoulliSampler(double fraction, long seed) { + this(fraction, new Random(seed)); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param random the random number generator. +*/ + public BernoulliSampler(double fraction, Random random) { + Preconditions.checkArgument(fraction = 0 fraction = 1.0d, fraction fraction must between [0, 1].); + this.fraction = fraction; + this.random = random; + } + + /** +* Sample the input elements, for each input element, take a Bernoulli Trail for sample. +* +* @param input elements to be sampled. +* @return the sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T current; + + @Override + public boolean hasNext() { + if (current == null) { + while (input.hasNext()) { + T element = input.next(); + if (random.nextDouble() = fraction) { + current = element; + return true; + } + } + current = null; + return false; + } + return false; --- End diff -- Could we put the last return statement in an `else` branch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2446]Fix SocketTextStreamFunction has m...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/965 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649215#comment-14649215 ] ASF GitHub Bot commented on FLINK-2248: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/957 Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-2248. - Resolution: Fixed Fix Version/s: 0.9.1 0.10 Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Assignee: Sachin Goel Priority: Minor Labels: starter Fix For: 0.10, 0.9.1 Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Framesize fix
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/934#issuecomment-126752412 Hi @kl0u , Thanks for addressing my comments. I've made some new ones :) but looks much better. As for merging the different types of snapshots, the only solution I see now, is to let all the accumulator results go through the BlobCache. That might cause some extra latency at the end of the job. I'm not sure whether we should proceed on that path but the current solution has a lot of added overhead because we need to deal with two different kind of update mechanisms. I would like to hear some other opinions on whether we should have both update mechanisms (BlobCache and Akka message-based) or just let everything go through the BlobCache. I'm leaning more towards the latter if it is not too slow for slow accumulators. I think you have to rebase to the latest master again because of some minor changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2454) Update Travis file to run build using Java7
Henry Saputra created FLINK-2454: Summary: Update Travis file to run build using Java7 Key: FLINK-2454 URL: https://issues.apache.org/jira/browse/FLINK-2454 Project: Flink Issue Type: Sub-task Reporter: Henry Saputra Update Travis file to run build using Java7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2455) Misleading I/O manager error log messages
Ufuk Celebi created FLINK-2455: -- Summary: Misleading I/O manager error log messages Key: FLINK-2455 URL: https://issues.apache.org/jira/browse/FLINK-2455 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: 0.9, master Reporter: Ufuk Celebi Fix For: 0.10, 0.9.1 The logs reported by [~andralungu] in FLINK-2412 show a lot of the following messages: {code} 20:13:27,504 WARN org.apache.flink.runtime.taskmanager.Task - Task 'CHAIN DataSource (at getEdgesDataSet(Degrees.java:64) (org.apache.flink.api.java.io.CsvInputFormat)) - Map (Map at getEdgesDataSet(Degrees.java:64)) (50/60)' did not react to cancelling signal, but is stuck in method: java.lang.Object.wait(Native Method) org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:126) org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:158) org.apache.flink.runtime.io.network.partition.SpillableSubpartition.release(SpillableSubpartition.java:130) org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:300) org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95) org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:356) org.apache.flink.runtime.taskmanager.Task.run(Task.java:674) java.lang.Thread.run(Thread.java:722) 20:13:27,583 ERROR org.apache.flink.runtime.io.network.partition.ResultPartition - Error during release of result subpartition: Closing of asynchronous file channel was interrupted. java.io.IOException: Closing of asynchronous file channel was interrupted. at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.close(AsynchronousFileIOChannel.java:130) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.closeAndDelete(AsynchronousFileIOChannel.java:158) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.release(SpillableSubpartition.java:130) at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:300) at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95) at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:356) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:674) at java.lang.Thread.run(Thread.java:722) {code} This is repeated for each subpartition during the release of a spillable partition (each subpartition is closed idp). The task is interrupted while waiting for the file channel to be closed. {code} 20:15:50,329 ERROR org.apache.flink.runtime.io.network.partition.ResultPartition - Error during release of result subpartition: IO-Manager has been closed. java.io.IOException: IO-Manager has been closed. at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.shutdown(IOManagerAsync.java:424) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:125) at org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649744#comment-14649744 ] Sheetal Parade commented on FLINK-2314: --- If no one is working on this issue, I would like to work. Make Streaming File Sources Persistent -- Key: FLINK-2314 URL: https://issues.apache.org/jira/browse/FLINK-2314 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Labels: easyfix, starter Streaming File sources should participate in the checkpointing. They should track the bytes they read from the file and checkpoint it. One can look at the sequence generating source function for an example of a checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
Gabor Gevay created FLINK-2447: -- Summary: TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type Key: FLINK-2447 URL: https://issues.apache.org/jira/browse/FLINK-2447 Project: Flink Issue Type: Bug Reporter: Gabor Gevay Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[ https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2447: --- Description: Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) was: Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[jira] [Updated] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[ https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay updated FLINK-2447: --- Component/s: Java API TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type --- Key: FLINK-2447 URL: https://issues.apache.org/jira/browse/FLINK-2447 Project: Flink Issue Type: Bug Components: Java API Reporter: Gabor Gevay Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/957#discussion_r35966826 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir; +import static org.junit.Assert.fail; + +public class CliFrontendLoggingTest { + + private static LocalFlinkMiniCluster cluster; + private static Configuration config; + private static String hostPort; + private ByteArrayOutputStream stream = new ByteArrayOutputStream(); + private CliFrontend cli; + private OutputStream output; + + @Before + public void setUp() throws Exception { + stream.reset(); + output = System.out; + System.setOut(new PrintStream(stream)); + + config = new Configuration(); + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localhost); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); + config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false); + hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : + + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + + try { + cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY); + } + catch (Exception e) { + e.printStackTrace(); + fail(Setup of test actor system failed.); + } + + cli = new CliFrontend(getConfigDir()); + } + + @After + public void shutDownActorSystem() { + System.setOut(new PrintStream(output)); --- End diff -- I think it should be `System.setOut(output)` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649117#comment-14649117 ] ASF GitHub Bot commented on FLINK-2248: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/957#discussion_r35966826 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir; +import static org.junit.Assert.fail; + +public class CliFrontendLoggingTest { + + private static LocalFlinkMiniCluster cluster; + private static Configuration config; + private static String hostPort; + private ByteArrayOutputStream stream = new ByteArrayOutputStream(); + private CliFrontend cli; + private OutputStream output; + + @Before + public void setUp() throws Exception { + stream.reset(); + output = System.out; + System.setOut(new PrintStream(stream)); + + config = new Configuration(); + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localhost); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); + config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false); + hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : + + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + + try { + cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY); + } + catch (Exception e) { + e.printStackTrace(); + fail(Setup of test actor system failed.); + } + + cli = new CliFrontend(getConfigDir()); + } + + @After + public void shutDownActorSystem() { + System.setOut(new PrintStream(output)); --- End diff -- I think it should be `System.setOut(output)` here. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649121#comment-14649121 ] ASF GitHub Bot commented on FLINK-2248: --- Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/957#discussion_r35967117 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir; +import static org.junit.Assert.fail; + +public class CliFrontendLoggingTest { + + private static LocalFlinkMiniCluster cluster; + private static Configuration config; + private static String hostPort; + private ByteArrayOutputStream stream = new ByteArrayOutputStream(); + private CliFrontend cli; + private OutputStream output; + + @Before + public void setUp() throws Exception { + stream.reset(); + output = System.out; + System.setOut(new PrintStream(stream)); + + config = new Configuration(); + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localhost); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); + config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false); + hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : + + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + + try { + cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY); + } + catch (Exception e) { + e.printStackTrace(); + fail(Setup of test actor system failed.); + } + + cli = new CliFrontend(getConfigDir()); + } + + @After + public void shutDownActorSystem() { + System.setOut(new PrintStream(output)); --- End diff -- System.setOut only takes a PrintStream as argument. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649135#comment-14649135 ] ASF GitHub Bot commented on FLINK-2248: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126669058 There was no specific need for it. However, since the CliFrontend only passes the Client the configuration, I decided to include it with that. There is a method on the Client to set the printing. Further, I think while writing programs using flink [flink-example-esqe] and running them in IDE, it would be nice to have a config entry to turn off the log. It gets distracting sometimes. If you want to achieve that, you can adjust the log4j.properties file and set logging to OFF. I would be for excluding the config entry because I don't think people will use it. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/957#discussion_r35967492 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir; +import static org.junit.Assert.fail; + +public class CliFrontendLoggingTest { + + private static LocalFlinkMiniCluster cluster; + private static Configuration config; + private static String hostPort; + private ByteArrayOutputStream stream = new ByteArrayOutputStream(); + private CliFrontend cli; + private OutputStream output; + + @Before + public void setUp() throws Exception { + stream.reset(); + output = System.out; + System.setOut(new PrintStream(stream)); + + config = new Configuration(); + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localhost); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); + config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false); + hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : + + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + + try { + cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY); + } + catch (Exception e) { + e.printStackTrace(); + fail(Setup of test actor system failed.); + } + + cli = new CliFrontend(getConfigDir()); + } + + @After + public void shutDownActorSystem() { + System.setOut(new PrintStream(output)); --- End diff -- Yes, and System.out returns a PrintStream :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649137#comment-14649137 ] ASF GitHub Bot commented on FLINK-2248: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/957#discussion_r35967492 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir; +import static org.junit.Assert.fail; + +public class CliFrontendLoggingTest { + + private static LocalFlinkMiniCluster cluster; + private static Configuration config; + private static String hostPort; + private ByteArrayOutputStream stream = new ByteArrayOutputStream(); + private CliFrontend cli; + private OutputStream output; + + @Before + public void setUp() throws Exception { + stream.reset(); + output = System.out; + System.setOut(new PrintStream(stream)); + + config = new Configuration(); + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localhost); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); + config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false); + hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : + + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + + try { + cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY); + } + catch (Exception e) { + e.printStackTrace(); + fail(Setup of test actor system failed.); + } + + cli = new CliFrontend(getConfigDir()); + } + + @After + public void shutDownActorSystem() { + System.setOut(new PrintStream(output)); --- End diff -- Yes, and System.out returns a PrintStream :) Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/957#discussion_r35967625 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir; +import static org.junit.Assert.fail; + +public class CliFrontendLoggingTest { + + private static LocalFlinkMiniCluster cluster; + private static Configuration config; + private static String hostPort; + private ByteArrayOutputStream stream = new ByteArrayOutputStream(); + private CliFrontend cli; + private OutputStream output; + + @Before + public void setUp() throws Exception { + stream.reset(); + output = System.out; + System.setOut(new PrintStream(stream)); + + config = new Configuration(); + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localhost); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); + config.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false); + hostPort = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + : + + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + + try { + cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.BATCH_ONLY); + } + catch (Exception e) { + e.printStackTrace(); + fail(Setup of test actor system failed.); + } + + cli = new CliFrontend(getConfigDir()); + } + + @After + public void shutDownActorSystem() { + System.setOut(new PrintStream(output)); --- End diff -- Ah. I just assumed it would be some kind of OutputStream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2452) Add a playcount threshold to the MusicProfiles example
[ https://issues.apache.org/jira/browse/FLINK-2452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14650010#comment-14650010 ] ASF GitHub Bot commented on FLINK-2452: --- GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/968 [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example This PR adds a user-defined parameter to the MusicProfiles example that filters out songs that a user has listened to only a few times. Essentially, it is a threshold for playcount, above which a user is considered to like a song. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink music-profiles Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/968.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #968 commit c0c8463521912d021c392c2c5edc254fee267eb8 Author: vasia va...@apache.org Date: 2015-07-31T20:12:18Z [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example Add a playcount threshold to the MusicProfiles example -- Key: FLINK-2452 URL: https://issues.apache.org/jira/browse/FLINK-2452 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.10 Reporter: Vasia Kalavri Assignee: Vasia Kalavri Priority: Minor In the MusicProfiles example, when creating the user-user similarity graph, an edge is created between any 2 users that have listened to the same song (even if once). Depending on the input data, this might produce a projection graph with many more edges than the original user-song graph. To make this computation more efficient, this issue proposes adding a user-defined parameter that filters out songs that a user has listened to only a few times. Essentially, it is a threshold for playcount, above which a user is considered to like a song. For reference, with a threshold value of 30, the whole Last.fm dataset is analyzed on my laptop in a few minutes, while no threshold results in a runtime of several hours. There are many solutions to this problem, but since this is just an example (not a library method), I think that keeping it simple is important. Thanks to [~andralungu] for spotting the inefficiency! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2452] [Gelly] adds a playcount threshol...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/968 [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example This PR adds a user-defined parameter to the MusicProfiles example that filters out songs that a user has listened to only a few times. Essentially, it is a threshold for playcount, above which a user is considered to like a song. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink music-profiles Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/968.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #968 commit c0c8463521912d021c392c2c5edc254fee267eb8 Author: vasia va...@apache.org Date: 2015-07-31T20:12:18Z [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2412) Race leading to IndexOutOfBoundsException when querying for buffer while releasing SpillablePartition
[ https://issues.apache.org/jira/browse/FLINK-2412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-2412: --- Summary: Race leading to IndexOutOfBoundsException when querying for buffer while releasing SpillablePartition (was: Index Out of Bounds Exception) Race leading to IndexOutOfBoundsException when querying for buffer while releasing SpillablePartition - Key: FLINK-2412 URL: https://issues.apache.org/jira/browse/FLINK-2412 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.9, 0.10 Reporter: Andra Lungu Assignee: Ufuk Celebi Priority: Critical Fix For: 0.10, 0.9.1 When running a code as simple as: {noformat} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSetEdgeString, NullValue edges = getEdgesDataSet(env); GraphString, NullValue, NullValue graph = Graph.fromDataSet(edges, env); DataSetTuple2String, Long degrees = graph.getDegrees(); degrees.writeAsCsv(outputPath, \n, ); env.execute(); on the Freindster data set: https://snap.stanford.edu/data/com-Friendster.html; on 30 Wally nodes I get the following exception: java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at inDegrees(Graph.java:701))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'wally028.cit.tu-berlin.de/130.149.249.38:53730'. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:722) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'wally028.cit.tu-berlin.de/130.149.249.38:53730'. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'wally028.cit.tu-berlin.de/130.149.249.38:53730'. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'wally028.cit.tu-berlin.de/130.149.249.38:53730'. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:227) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126612061 Thanks for adding the tests! This looks good to merge. Could you update https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2361) CompactingHashTable loses entries
[ https://issues.apache.org/jira/browse/FLINK-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-2361: - Summary: CompactingHashTable loses entries (was: flatMap + distinct gives erroneous results for big data sets) CompactingHashTable loses entries - Key: FLINK-2361 URL: https://issues.apache.org/jira/browse/FLINK-2361 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 0.10 Reporter: Andra Lungu When running the simple Connected Components algorithm (currently in Gelly) on the twitter follower graph, with 1, 100 or 1 iterations, I get the following error: Caused by: java.lang.Exception: Target vertex '657282846' does not exist!. at org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300) at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) at org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:722) Now this is very bizzare as the DataSet of vertices is produced from the DataSet of edges... Which means there cannot be a an edge with an invalid target id... The method calls flatMap to isolate the src and trg ids and distinct to ensure their uniqueness. The algorithm works fine for smaller data sets... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648960#comment-14648960 ] ASF GitHub Bot commented on FLINK-2248: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126614333 @mxm , updated. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126614333 @mxm , updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2361) CompactingHashTable loses entries
[ https://issues.apache.org/jira/browse/FLINK-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-2361: - Priority: Critical (was: Major) CompactingHashTable loses entries - Key: FLINK-2361 URL: https://issues.apache.org/jira/browse/FLINK-2361 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 0.10 Reporter: Andra Lungu Priority: Critical When running the simple Connected Components algorithm (currently in Gelly) on the twitter follower graph, with 1, 100 or 1 iterations, I get the following error: Caused by: java.lang.Exception: Target vertex '657282846' does not exist!. at org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300) at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) at org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:722) Now this is very bizzare as the DataSet of vertices is produced from the DataSet of edges... Which means there cannot be a an edge with an invalid target id... The method calls flatMap to isolate the src and trg ids and distinct to ensure their uniqueness. The algorithm works fine for smaller data sets... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server
[ https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi updated FLINK-2446: -- Affects Version/s: (was: 0.8.1) 0.10 SocketTextStreamFunction has memory leak when reconnect server -- Key: FLINK-2446 URL: https://issues.apache.org/jira/browse/FLINK-2446 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2446]Fix SocketTextStreamFunction has m...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/965#issuecomment-126619190 Good catch, if no objections I will merge this in the evening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648971#comment-14648971 ] ASF GitHub Bot commented on FLINK-2248: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126620784 Thanks. One more thing. Could you also update https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html ? You introduced a new config entry. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2248][client]Add flag to disable sysout...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/957#issuecomment-126620784 Thanks. One more thing. Could you also update https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html ? You introduced a new config entry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2446) SocketTextStreamFunction has memory leak when reconnect server
[ https://issues.apache.org/jira/browse/FLINK-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648977#comment-14648977 ] ASF GitHub Bot commented on FLINK-2446: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/965#issuecomment-12669 Nice catch! Looks good to merge. SocketTextStreamFunction has memory leak when reconnect server -- Key: FLINK-2446 URL: https://issues.apache.org/jira/browse/FLINK-2446 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2166) Add fromCsvFile() to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648981#comment-14648981 ] ASF GitHub Bot commented on FLINK-2166: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/939#issuecomment-126624411 Maybe we could move the TableEnvironment to org.apache.flink.api.table to make it clear that it can be used for both languages. Add fromCsvFile() to TableEnvironment - Key: FLINK-2166 URL: https://issues.apache.org/jira/browse/FLINK-2166 Project: Flink Issue Type: New Feature Components: Table API Affects Versions: 0.9 Reporter: Fabian Hueske Priority: Minor Labels: starter Add a {{fromCsvFile()}} method to the {{TableEnvironment}} to read a {{Table}} from a CSV file. The implementation should reuse Flink's CsvInputFormat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2248) Allow disabling of sdtout logging output
[ https://issues.apache.org/jira/browse/FLINK-2248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648945#comment-14648945 ] ASF GitHub Bot commented on FLINK-2248: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/957#discussion_r35956433 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendLoggingTest.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.io.PrintStream; + +import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath; +import static org.apache.flink.client.CliFrontendTestUtils.getConfigDir; +import static org.junit.Assert.fail; + +public class CliFrontendLoggingTest { + + private static LocalFlinkMiniCluster cluster; + private static Configuration config; + private static String hostPort; + private ByteArrayOutputStream stream = new ByteArrayOutputStream(); + private CliFrontend cli; + + @Before + public void setUp() throws Exception { + stream.reset(); + System.setOut(new PrintStream(stream)); --- End diff -- You are overwriting stdout permanently here. You could save `System.out` to a variable here and set it again in the `@After` method. Allow disabling of sdtout logging output Key: FLINK-2248 URL: https://issues.apache.org/jira/browse/FLINK-2248 Project: Flink Issue Type: Improvement Reporter: Theodore Vasiloudis Priority: Minor Labels: starter Currently when a job is submitted through the CLI we get in stdout all the log output about each stage in the job. It would useful to have an easy way to disable this output when submitting the job, as most of the time we are only interested in the log output if something goes wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2450) IndexOutOfBoundsException in KryoSerializer
Ufuk Celebi created FLINK-2450: -- Summary: IndexOutOfBoundsException in KryoSerializer Key: FLINK-2450 URL: https://issues.apache.org/jira/browse/FLINK-2450 Project: Flink Issue Type: Bug Components: Type Serialization System Affects Versions: 0.9, master Reporter: Ufuk Celebi Fix For: 0.10, 0.9.1 The following Exception has been reported by [~andralungu]: {code} Caused by: java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at groupReduceOnNeighbors(Graph.java:1405))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 53, Size: 0 at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:476) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:576) at java.lang.Thread.run(Thread.java:722) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 53, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) at org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1096) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Index: 53, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) Caused by: java.lang.IndexOutOfBoundsException: Index: 53, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:604) at java.util.ArrayList.get(ArrayList.java:382) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:531) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1328) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781) {code} See FLINK-2412 for example program and data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2166) Add fromCsvFile() to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648973#comment-14648973 ] ASF GitHub Bot commented on FLINK-2166: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/939#issuecomment-126621482 Just my opinion, `TableEnvironment` is located under `org.apache.flink.api.java.table` because of unifying of Table API implementation. But Table API is implemented on Scala. I think that using Scala API is proper for this. @aljoscha @fhueske How do you think about this? Add fromCsvFile() to TableEnvironment - Key: FLINK-2166 URL: https://issues.apache.org/jira/browse/FLINK-2166 Project: Flink Issue Type: New Feature Components: Table API Affects Versions: 0.9 Reporter: Fabian Hueske Priority: Minor Labels: starter Add a {{fromCsvFile()}} method to the {{TableEnvironment}} to read a {{Table}} from a CSV file. The implementation should reuse Flink's CsvInputFormat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2166. Add fromCsvFile() method to TableE...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/939#issuecomment-126621482 Just my opinion, `TableEnvironment` is located under `org.apache.flink.api.java.table` because of unifying of Table API implementation. But Table API is implemented on Scala. I think that using Scala API is proper for this. @aljoscha @fhueske How do you think about this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2166. Add fromCsvFile() method to TableE...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/939#issuecomment-126624411 Maybe we could move the TableEnvironment to org.apache.flink.api.table to make it clear that it can be used for both languages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2447) TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
[ https://issues.apache.org/jira/browse/FLINK-2447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14648906#comment-14648906 ] Fabian Hueske commented on FLINK-2447: -- Thanks for the detailed report! [~twalthr], has done a lot in the TypeExtractor. Maybe he can comment on this problem. TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type --- Key: FLINK-2447 URL: https://issues.apache.org/jira/browse/FLINK-2447 Project: Flink Issue Type: Bug Components: Java API Reporter: Gabor Gevay Consider the following code: DataSetFooBarPojo d1 = env.fromElements(new FooBarPojo()); DataSetTuple2FooBarPojo, FooBarPojo d2 = d1.map(new MapFunctionFooBarPojo, Tuple2FooBarPojo, FooBarPojo() { @Override public Tuple2FooBarPojo, FooBarPojo map(FooBarPojo value) throws Exception { return null; } }); where FooBarPojo is the following type: public class FooBarPojo { public int foo, bar; public FooBarPojo() {} } This should print a tuple type with two identical fields: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer] But it prints the following instead: Java Tuple2PojoTypeFooBarPojo, fields = [bar: Integer, foo: Integer], GenericTypeFooBarPojo Note, that this problem causes some co-groups in Gelly to crash with org.apache.flink.api.common.InvalidProgramException: The pair of co-group keys are not compatible with each other when the vertex ID type is a POJO, because the second field of the Edge type gets to be a generic type, but the POJO gets recognized in the Vertex type, and getNumberOfKeyFields returns different numbers for the POJO and the generic type. The source of the problem is the mechanism in TypeExtractor that would detect recursive types (see the alreadySeen field in TypeExtractor), as it mistakes the second appearance of FooBarPojo with a recursive field. Specifically the following happens: createTypeInfoWithTypeHierarchy starts to process the Tuple2FooBarPojo, FooBarPojo type, and in line 434 it calls itself for the first field, which proceeds into the privateGetForClass case which correctly detects that it is a POJO, and correctly returns a PojoTypeInfo; but in the meantime in line 1191, privateGetForClass adds PojoTypeInfo to alreadySeen. Then the outer createTypeInfoWithTypeHierarchy approaches the second field, goes into privateGetForClass, which mistakenly returns a GenericTypeInfo, as it thinks in line 1187, that a recursive type is being processed. (Note, that if we comment out the recursive type detection (the lines that do their thing with the alreadySeen field), then the output is correct.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase
Chesnay Schepler created FLINK-2448: --- Summary: registerCacheFile fails with MultipleProgramsTestbase Key: FLINK-2448 URL: https://issues.apache.org/jira/browse/FLINK-2448 Project: Flink Issue Type: Bug Components: Tests Reporter: Chesnay Schepler Priority: Minor When trying to register a file using a constant name an expection is thrown saying the file was already cached. This is probably because the same environment is reused, and the cacheFile entries are not cleared between runs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)