[GitHub] flink pull request: [FLINK-947] Add a declarative expression API
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/405#issuecomment-74832989 One more thing, the maven module is called "flink-linq". Are we certain that we can use the name LINQ without problems here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-947) Add support for "Named Datasets"
[ https://issues.apache.org/jira/browse/FLINK-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325617#comment-14325617 ] ASF GitHub Bot commented on FLINK-947: -- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/405#issuecomment-74832989 One more thing, the maven module is called "flink-linq". Are we certain that we can use the name LINQ without problems here? > Add support for "Named Datasets" > > > Key: FLINK-947 > URL: https://issues.apache.org/jira/browse/FLINK-947 > Project: Flink > Issue Type: New Feature > Components: Java API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Minor > > This would create an API that is a mix between SQL like declarativity and the > power of user defined functions. Example user code could look like this: > {code:Java} > NamedDataSet one = ... > NamedDataSet two = ... > NamedDataSet result = one.join(two).where("key").equalTo("otherKey") > .project("a", "b", "c") > .map( (UserTypeIn in) -> return new UserTypeOut(...) ) > .print(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74833886 I would like to merge this pull request soon. @aljoscha, do you agree that we can investigate the performance for the PojoComparator also when the change is merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1417) Automatically register nested types at Kryo
[ https://issues.apache.org/jira/browse/FLINK-1417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325629#comment-14325629 ] ASF GitHub Bot commented on FLINK-1417: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74833886 I would like to merge this pull request soon. @aljoscha, do you agree that we can investigate the performance for the PojoComparator also when the change is merged? > Automatically register nested types at Kryo > --- > > Key: FLINK-1417 > URL: https://issues.apache.org/jira/browse/FLINK-1417 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Reporter: Stephan Ewen >Assignee: Robert Metzger > Fix For: 0.9 > > > Currently, the {{GenericTypeInfo}} registers the class of the type at Kryo. > In order to get the best performance, it should recursively walk the classes > and make sure that it registered all contained subtypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1575) JobManagerConnectionTest.testResolveUnreachableActorRemoteHost times out on travis
[ https://issues.apache.org/jira/browse/FLINK-1575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger closed FLINK-1575. - Resolution: Invalid The issue is caused by one of my changes (dependency conflict). I'm closing it as invalid. > JobManagerConnectionTest.testResolveUnreachableActorRemoteHost times out on > travis > -- > > Key: FLINK-1575 > URL: https://issues.apache.org/jira/browse/FLINK-1575 > Project: Flink > Issue Type: Bug >Reporter: Robert Metzger > > This might be related to FLINK-1529. > I saw this issue now at least twice on travis: > https://travis-ci.org/rmetzger/flink/jobs/51108554 > {code} > Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 200.266 sec > <<< FAILURE! - in org.apache.flink.runtime.jobmanager.JobManagerConnectionTest > testResolveUnreachableActorRemoteHost(org.apache.flink.runtime.jobmanager.JobManagerConnectionTest) > Time elapsed: 100.215 sec <<< ERROR! > java.util.concurrent.TimeoutException: Futures timed out after [10 > milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at akka.remote.Remoting.start(Remoting.scala:173) > at > akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) > at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) > at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) > at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) > at akka.actor.ActorSystem$.create(ActorSystem.scala:66) > at > org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:71) > at > org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:61) > at > org.apache.flink.runtime.jobmanager.JobManagerConnectionTest.testResolveUnreachableActorRemoteHost(JobManagerConnectionTest.scala:88) > testResolveUnreachableActorLocalHost(org.apache.flink.runtime.jobmanager.JobManagerConnectionTest) > Time elapsed: 100.031 sec <<< ERROR! > java.util.concurrent.TimeoutException: Futures timed out after [10 > milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at akka.remote.Remoting.start(Remoting.scala:173) > at > akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) > at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) > at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) > at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) > at akka.actor.ActorSystem$.create(ActorSystem.scala:66) > at > org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:71) > at > org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:61) > at > org.apache.flink.runtime.jobmanager.JobManagerConnectionTest.testResolveUnreachableActorLocalHost(JobManagerConnectionTest.scala:45) > Running org.apache.flink.runtime.operators.hash.MemoryHashTableTest > [ERROR] [02/17/2015 17:38:04.250] [main] [Remoting] Remoting error: [Startup > timed out] [ > akka.remote.RemoteTransportException: Startup timed out > at > akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129) > at akka.remote.Remoting.start(Remoting.scala:191) > at > akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) > at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) > at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) > at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) > at akka.actor.ActorSystem$.create(ActorSystem.scala:66) > at > org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:71) >
[jira] [Updated] (FLINK-1573) Add per-job metrics to flink.
[ https://issues.apache.org/jira/browse/FLINK-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-1573: -- Fix Version/s: (was: pre-apache) > Add per-job metrics to flink. > - > > Key: FLINK-1573 > URL: https://issues.apache.org/jira/browse/FLINK-1573 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Robert Metzger > > With FLINK-1501, we have JVM specific metrics (mainly monitoring the TMs). > With this task, I would like to add metrics which are job-specific. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-947] Add a declarative expression API
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/405#issuecomment-74842606 Yeah, I'm not sure about linq as well. I like the name but realise that it might be problematic. What do the others think. I could call it "flink-expressions". I will add documentation about which types are supported and a good error message for unsupported types as @rmetzger mentioned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-947) Add support for "Named Datasets"
[ https://issues.apache.org/jira/browse/FLINK-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325686#comment-14325686 ] ASF GitHub Bot commented on FLINK-947: -- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/405#issuecomment-74842606 Yeah, I'm not sure about linq as well. I like the name but realise that it might be problematic. What do the others think. I could call it "flink-expressions". I will add documentation about which types are supported and a good error message for unsupported types as @rmetzger mentioned. > Add support for "Named Datasets" > > > Key: FLINK-947 > URL: https://issues.apache.org/jira/browse/FLINK-947 > Project: Flink > Issue Type: New Feature > Components: Java API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Minor > > This would create an API that is a mix between SQL like declarativity and the > power of user defined functions. Example user code could look like this: > {code:Java} > NamedDataSet one = ... > NamedDataSet two = ... > NamedDataSet result = one.join(two).where("key").equalTo("otherKey") > .project("a", "b", "c") > .map( (UserTypeIn in) -> return new UserTypeOut(...) ) > .print(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74843783 Yes please, go ahead. But if our Pojo stuff is really that slow we should think about how to improve that or remove it alltogether. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1417) Automatically register nested types at Kryo
[ https://issues.apache.org/jira/browse/FLINK-1417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325691#comment-14325691 ] ASF GitHub Bot commented on FLINK-1417: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74843783 Yes please, go ahead. But if our Pojo stuff is really that slow we should think about how to improve that or remove it alltogether. > Automatically register nested types at Kryo > --- > > Key: FLINK-1417 > URL: https://issues.apache.org/jira/browse/FLINK-1417 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Reporter: Stephan Ewen >Assignee: Robert Metzger > Fix For: 0.9 > > > Currently, the {{GenericTypeInfo}} registers the class of the type at Kryo. > In order to get the best performance, it should recursively walk the classes > and make sure that it registered all contained subtypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74843995 Yes. First we need to understand why exactly the performance is so poor. Maybe its an issue we can "easily" fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1417) Automatically register nested types at Kryo
[ https://issues.apache.org/jira/browse/FLINK-1417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325693#comment-14325693 ] ASF GitHub Bot commented on FLINK-1417: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74843995 Yes. First we need to understand why exactly the performance is so poor. Maybe its an issue we can "easily" fix. > Automatically register nested types at Kryo > --- > > Key: FLINK-1417 > URL: https://issues.apache.org/jira/browse/FLINK-1417 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Reporter: Stephan Ewen >Assignee: Robert Metzger > Fix For: 0.9 > > > Currently, the {{GenericTypeInfo}} registers the class of the type at Kryo. > In order to get the best performance, it should recursively walk the classes > and make sure that it registered all contained subtypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1577) Misleading error messages when cancelling tasks
Ufuk Celebi created FLINK-1577: -- Summary: Misleading error messages when cancelling tasks Key: FLINK-1577 URL: https://issues.apache.org/jira/browse/FLINK-1577 Project: Flink Issue Type: Improvement Components: Distributed Runtime Affects Versions: master Reporter: Ufuk Celebi A user running a Flink version before bec9c4d ran into a job manager failure (fixed in bec9c4d), which lead to restarting the JM and cancelling/clearing all tasks on the TMs. The logs of the TMs were inconclusive. I think part of that has been fixed by now, e.g. there is a log message when cancelAndClearEverything is called, but the task thread (RuntimeEnvironment) always logs an error when interrupted during the run method -- even if the task gets cancelled. I think these error messages are misleading and only the root cause is important (i.e. non-failed tasks should be silently cancelled). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1577) Misleading error messages when cancelling tasks
[ https://issues.apache.org/jira/browse/FLINK-1577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325695#comment-14325695 ] Ufuk Celebi commented on FLINK-1577: {code} 10:56:40,161 ERROR org.apache.flink.runtime.execution.RuntimeEnvironment - Error during running invokable: Event handler interrupted without reaching end-of-superstep. java.lang.RuntimeException: Event handler interrupted without reaching end-of-superstep. {code} {code} 10:56:40,197 ERROR org.apache.flink.runtime.execution.RuntimeEnvironment - Error during running invokable: Bug in reader logic: queried for a buffer although none was available. java.lang.IllegalStateException: Bug in reader logic: queried for a buffer although none was available. at org.apache.flink.runtime.io.network.api.reader.BufferReader.getNextBufferBlocking(BufferReader.java:267) {code} > Misleading error messages when cancelling tasks > --- > > Key: FLINK-1577 > URL: https://issues.apache.org/jira/browse/FLINK-1577 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: master >Reporter: Ufuk Celebi > > A user running a Flink version before bec9c4d ran into a job manager failure > (fixed in bec9c4d), which lead to restarting the JM and cancelling/clearing > all tasks on the TMs. > The logs of the TMs were inconclusive. I think part of that has been fixed by > now, e.g. there is a log message when cancelAndClearEverything is called, but > the task thread (RuntimeEnvironment) always logs an error when interrupted > during the run method -- even if the task gets cancelled. > I think these error messages are misleading and only the root cause is > important (i.e. non-failed tasks should be silently cancelled). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-947] Add a declarative expression API
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/405#issuecomment-74845397 To me, `flink-expressions` sounds much better than `linq` and it mitigates the risk of law suites :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-947) Add support for "Named Datasets"
[ https://issues.apache.org/jira/browse/FLINK-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325705#comment-14325705 ] ASF GitHub Bot commented on FLINK-947: -- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/405#issuecomment-74845397 To me, `flink-expressions` sounds much better than `linq` and it mitigates the risk of law suites :) > Add support for "Named Datasets" > > > Key: FLINK-947 > URL: https://issues.apache.org/jira/browse/FLINK-947 > Project: Flink > Issue Type: New Feature > Components: Java API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Minor > > This would create an API that is a mix between SQL like declarativity and the > power of user defined functions. Example user code could look like this: > {code:Java} > NamedDataSet one = ... > NamedDataSet two = ... > NamedDataSet result = one.join(two).where("key").equalTo("otherKey") > .project("a", "b", "c") > .map( (UserTypeIn in) -> return new UserTypeOut(...) ) > .print(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1566) WindowIntegrationTest fails
[ https://issues.apache.org/jira/browse/FLINK-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora resolved FLINK-1566. --- Resolution: Fixed > WindowIntegrationTest fails > --- > > Key: FLINK-1566 > URL: https://issues.apache.org/jira/browse/FLINK-1566 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Till Rohrmann >Assignee: Gyula Fora > > I'm seeing spurious failures of the {{WindowIntegrationTest}}. > The error is the following: > {code} > java.lang.AssertionError: > Expected :[[3], [4], [5], [11], [10], [16]] > Actual :[[3], null, [5], [11], [10], [16]] > > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest.validateOutput(WindowIntegrationTest.java:151) > at > org.apache.flink.streaming.api.invokable.operator.windowing.WindowIntegrationTest.test(WindowIntegrationTest.java:134) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1578) Overhaul BLOB manager
Stephan Ewen created FLINK-1578: --- Summary: Overhaul BLOB manager Key: FLINK-1578 URL: https://issues.apache.org/jira/browse/FLINK-1578 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 0.8, 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9, 0.8 The BLOB manager need improvements: - Decent failure tests - Better error handling (letting the client know what happened) - Better error logging - Retries upon failed fetches - A bit of control over the maximum number of concurrent connections and the backlog -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1561] [build system] Use a fresh fork f...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/412 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1561) Improve build server robustness by not reusing JVMs in integration tests
[ https://issues.apache.org/jira/browse/FLINK-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1561. - Resolution: Fixed Fixed via a8c99d5a8fc6a700a670da5443d9a808ac5eaaa0 > Improve build server robustness by not reusing JVMs in integration tests > > > Key: FLINK-1561 > URL: https://issues.apache.org/jira/browse/FLINK-1561 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.9 > > > Right now, JVM forks are reused across unit and integration tests. While it > makes a lot of sense for the very quick unit tests, the integration tests > benefit less. > We also see spurious JVM failures in the integration tests that are most > likely related to JVM issues. An example is the trace below: > {code} > java.io.IOException: Cannot allocate memory > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > at > org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:270) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:440) > {code} > I think we can improve reliability of Travis tests by not reusing forks for > integration tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1561) Improve build server robustness by not reusing JVMs in integration tests
[ https://issues.apache.org/jira/browse/FLINK-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325760#comment-14325760 ] ASF GitHub Bot commented on FLINK-1561: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/412 > Improve build server robustness by not reusing JVMs in integration tests > > > Key: FLINK-1561 > URL: https://issues.apache.org/jira/browse/FLINK-1561 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.9 > > > Right now, JVM forks are reused across unit and integration tests. While it > makes a lot of sense for the very quick unit tests, the integration tests > benefit less. > We also see spurious JVM failures in the integration tests that are most > likely related to JVM issues. An example is the trace below: > {code} > java.io.IOException: Cannot allocate memory > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) > at > org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:270) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:440) > {code} > I think we can improve reliability of Travis tests by not reusing forks for > integration tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1564) Make sure BLOB client downloads files only once
[ https://issues.apache.org/jira/browse/FLINK-1564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1564. - Resolution: Duplicate Duplicate of [FLINK-1578] > Make sure BLOB client downloads files only once > --- > > Key: FLINK-1564 > URL: https://issues.apache.org/jira/browse/FLINK-1564 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 0.8, 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Minor > Fix For: 0.9, 0.8 > > > Currently, the BLOB manager may download files multiple times, if concurrent > requests are issued. We should prevent that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1562) Introduce retries for fetching data from the BLOB manager
[ https://issues.apache.org/jira/browse/FLINK-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1562. - Resolution: Duplicate Subsumed by[FLINK-1578] > Introduce retries for fetching data from the BLOB manager > - > > Key: FLINK-1562 > URL: https://issues.apache.org/jira/browse/FLINK-1562 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 0.8, 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.9, 0.8 > > > Fetch requests from the blob manager sometimes fail with connection resets. > The blob client should retry on such a failure to compensate for spurious > fetch failures. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1552) Allow secondary sorts in Vertex Centric Iteration
[ https://issues.apache.org/jira/browse/FLINK-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325769#comment-14325769 ] Martin Kiefer commented on FLINK-1552: -- I worked on Approximate Maximum Weight Watchings. Optimal Min/Max Weight Matchings are usually found by variants of the Blossom algorithm, however, the algorithm parallelizes badly. You can obtain a 1/2-approximation with less complex algorithms. Salihogulu and Widom proposed several graph algorithms in their paper "Optimizing Graph Algorithms on Pregel-like Systems" at VLDB 2014. Among these algorithms was a scalable variant of an Approximate Maximum Weight Matching Algorithm. http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf I implemented it with Gelly in the plain version presented in the paper. Additionally, we implemented [Flink-1515], so we could provide an implementation with the optimization the authors called "Edge Cleaning on Demand (ECOD)". I also dirtily implemented a SortedVertexCentricIteration that does adress this issue and provided two additionaly variants making use of secondary sorts. However, one could argue that this is not the most beautiful algorithm for an implementation with Gelly. The algorithm requires you to find the maximum weight edge of vertices and somehow provide them in the next update step. You also need to do something that is equivalent to removing edges from vertices. So, you have to choose between two options of wich either one kind of lacks beauty: 1. Store all Edges in the VertexValue We then can find the maximum vertex value in the update step and store it in the vertex value. We can easily remove edges from the vertex value. This blows up the the VertexValue right from the beginning and makes the messaging CoGroup in a VertexCentricIteration senseless. 2. Store the VertexKeys of all removed edges in the VertexValue + self messaging We can find the maximum vertex value in the messaging step and the vertex can send a message to itself to remember its decision. This hopefully has low cost because the message should not have to go over the network. We only store the vertex keys for deleted edges in the vertex state so we can ignore them in the messaging step. We chose the latter option. If you are nevertheless interested in this algorithm I can give you access to the code so you can have a look at it. > Allow secondary sorts in Vertex Centric Iteration > - > > Key: FLINK-1552 > URL: https://issues.apache.org/jira/browse/FLINK-1552 > Project: Flink > Issue Type: Wish > Components: Gelly >Reporter: Martin Kiefer >Priority: Minor > > The `VertexCentricIteration` class holds the logic to transform a > `VertexUpdateFunction` and a `MessagingFunction` into an iteration with two > CoGroup operators working on the set of messages and edges. Graph algorithms > can profit from implying an order on the edges or messages based on their > value and/or the vertex ID. This can be implemented easily making use of > secondary sorts. I would suggest extending the `VertexCentricIteration` to > allow to specify these kind of orderings optionally. > For example, this comes handy when it is necessary to find the edges with the > minimum or maximum value or the algorithm requires to pick the edge with > lower vertex ID for edges with equal value. Similar use cases might be found > for orders on the messages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1552) Allow secondary sorts in Vertex Centric Iteration
[ https://issues.apache.org/jira/browse/FLINK-1552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325769#comment-14325769 ] Martin Kiefer edited comment on FLINK-1552 at 2/18/15 11:47 AM: I worked on Approximate Maximum Weight Watchings. Optimal Min/Max Weight Matchings are usually found by variants of the Blossom algorithm, however, the algorithm parallelizes badly. You can obtain a 1/2-approximation with less complex algorithms. Salihogulu and Widom proposed several graph algorithms in their paper "Optimizing Graph Algorithms on Pregel-like Systems" at VLDB 2014. Among these algorithms was a scalable variant of an Approximate Maximum Weight Matching Algorithm. http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf I implemented it with Gelly in the plain version presented in the paper. Additionally, we implemented [Flink-1515], so we could provide an implementation with the optimization the authors called "Edge Cleaning on Demand (ECOD)". I also dirtily implemented a SortedVertexCentricIteration that does adress this issue and provided two additionaly variants making use of secondary sorts. However, one could argue that this is not the most beautiful algorithm for an implementation with Gelly. At least not as an easy example. The algorithm requires you to find the maximum weight edge of vertices and somehow provide them in the next update step. You also need to do something that is equivalent to removing edges from vertices. So, you have to choose between two options of wich either one kind of lacks beauty: 1. Store all Edges in the VertexValue We then can find the maximum vertex value in the update step and store it in the vertex value. We can easily remove edges from the vertex value. This blows up the the VertexValue right from the beginning and makes the messaging CoGroup in a VertexCentricIteration senseless. 2. Store the VertexKeys of all removed edges in the VertexValue + self messaging We can find the maximum vertex value in the messaging step and the vertex can send a message to itself to remember its decision. This hopefully has low cost because the message should not have to go over the network. We only store the vertex keys for deleted edges in the vertex state so we can ignore them in the messaging step. We chose the latter option. If you are nevertheless interested in this algorithm I can give you access to the code so you can have a look at it. was (Author: martinkiefer): I worked on Approximate Maximum Weight Watchings. Optimal Min/Max Weight Matchings are usually found by variants of the Blossom algorithm, however, the algorithm parallelizes badly. You can obtain a 1/2-approximation with less complex algorithms. Salihogulu and Widom proposed several graph algorithms in their paper "Optimizing Graph Algorithms on Pregel-like Systems" at VLDB 2014. Among these algorithms was a scalable variant of an Approximate Maximum Weight Matching Algorithm. http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf I implemented it with Gelly in the plain version presented in the paper. Additionally, we implemented [Flink-1515], so we could provide an implementation with the optimization the authors called "Edge Cleaning on Demand (ECOD)". I also dirtily implemented a SortedVertexCentricIteration that does adress this issue and provided two additionaly variants making use of secondary sorts. However, one could argue that this is not the most beautiful algorithm for an implementation with Gelly. The algorithm requires you to find the maximum weight edge of vertices and somehow provide them in the next update step. You also need to do something that is equivalent to removing edges from vertices. So, you have to choose between two options of wich either one kind of lacks beauty: 1. Store all Edges in the VertexValue We then can find the maximum vertex value in the update step and store it in the vertex value. We can easily remove edges from the vertex value. This blows up the the VertexValue right from the beginning and makes the messaging CoGroup in a VertexCentricIteration senseless. 2. Store the VertexKeys of all removed edges in the VertexValue + self messaging We can find the maximum vertex value in the messaging step and the vertex can send a message to itself to remember its decision. This hopefully has low cost because the message should not have to go over the network. We only store the vertex keys for deleted edges in the vertex state so we can ignore them in the messaging step. We chose the latter option. If you are nevertheless interested in this algorithm I can give you access to the code so you can have a look at it. > Allow secondary sorts in Vertex Centric Iteration > - > > Key: FLINK-1552 > URL: https://issues.apache.org/jira/br
[jira] [Resolved] (FLINK-1548) [DISCUSS] Make Scala implicit parameters explicit in the runtime
[ https://issues.apache.org/jira/browse/FLINK-1548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1548. - Resolution: Fixed Assignee: Stephan Ewen Resolved through mailing list discussion (see archive: http://mail-archives.apache.org/mod_mbox/flink-dev/201502.mbox/%3C48F7533F-FA7B-450F-8DD0-383FF8230349%40kth.se%3E). Moved to Wiki Page https://cwiki.apache.org/confluence/display/FLINK/Coding+Guidelines+for+Scala > [DISCUSS] Make Scala implicit parameters explicit in the runtime > > > Key: FLINK-1548 > URL: https://issues.apache.org/jira/browse/FLINK-1548 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Minor > Fix For: 0.9 > > > Scala's feature of implicit parameters is very powerful and invaluable in the > design of nice high level APIs. > In the system runtime, thought, I think we should not use implicit > parameters, as they make the code more tricky to understand and make it > harder to figure out where parameters actually come from. The API niceties > are not required there. I propose to make all parameters explicit in runtime > classes. Right now, this concerns mostly ActorSystem and Timeout parameters. > This is nothing we need to do as a separate task, I would suggest to change > that whenever we encounter such a method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi updated FLINK-1421: -- Comment: was deleted (was: When your program is distributed one of the workers does not have HelloWordContentEvent in its classpath. As a first workaround try copying the jar to the lib folder of all of your machines. Not a nice solution, but should work.) > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325808#comment-14325808 ] Márton Balassi commented on FLINK-1421: --- When your program is distributed one of the workers does not have HelloWordContentEvent in its classpath. As a first workaround try copying the jar to the lib folder of all of your machines. Not a nice solution, but should work. > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325807#comment-14325807 ] Márton Balassi commented on FLINK-1421: --- When your program is distributed one of the workers does not have HelloWordContentEvent in its classpath. As a first workaround try copying the jar to the lib folder of all of your machines. Not a nice solution, but should work. > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1555] Add serializer hierarchy debug ut...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/415 [FLINK-1555] Add serializer hierarchy debug util You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink1555 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #415 commit 9e640e520c3120bea32798daf42ee64b10dd8655 Author: Robert Metzger Date: 2015-02-18T10:39:24Z [FLINK-1555] Add serializer hierarchy debug util --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1555) Add utility to log the serializers of composite types
[ https://issues.apache.org/jira/browse/FLINK-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325820#comment-14325820 ] ASF GitHub Bot commented on FLINK-1555: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/415 [FLINK-1555] Add serializer hierarchy debug util You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink1555 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #415 commit 9e640e520c3120bea32798daf42ee64b10dd8655 Author: Robert Metzger Date: 2015-02-18T10:39:24Z [FLINK-1555] Add serializer hierarchy debug util > Add utility to log the serializers of composite types > - > > Key: FLINK-1555 > URL: https://issues.apache.org/jira/browse/FLINK-1555 > Project: Flink > Issue Type: Improvement >Reporter: Robert Metzger >Priority: Minor > > Users affected by poor performance might want to understand how Flink is > serializing their data. > Therefore, it would be cool to have a tool utility which logs the serializers > like this: > {{SerializerUtils.getSerializers(TypeInformation t);}} > to get > {code} > PojoSerializer > TupleSerializer > IntSer > DateSer > GenericTypeSer(java.sql.Date) > PojoSerializer > GenericTypeSer(HashMap) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325823#comment-14325823 ] Stephan Ewen commented on FLINK-1421: - My guess is that the Samoa adapter (specifically the SamoaTypeSerializer) has not implemented the dynamic class loading. It looks like all classes are always sent through standard Java serialization. Durong deserialization, the java.io.ObjectInputStream needs to resolve the class it encounteres and it uses some class loader for that. It is important that this class loader is the usercode class loader. You can usually grab this through {{Thread.currentThread().getContextClassLoader()}}. BTW: I think that this is an extremely inefficient way of exchanging data. While feasible for a first prototype, this should be on the list to be improved. Here is the stack trace is a nicer format {code} org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.RuntimeException: org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.yahoo.labs.samoa.examples.HelloWorldContentEvent at org.apache.flink.streaming.api.streamvertex.OutputHandler.invokeUserFunction(OutputHandler.java:232) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:121) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.yahoo.labs.samoa.examples.HelloWorldContentEvent at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:165) at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:192) at com.yahoo.labs.flink.SamoaTypeSerializer.deserialize(SamoaTypeSerializer.java:84) at com.yahoo.labs.flink.SamoaTypeSerializer.deserialize(SamoaTypeSerializer.java:33) at org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:107) at org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:66) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:33) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.streaming.api.invokable.StreamInvokable.readNext(StreamInvokable.java:102) at com.yahoo.labs.flink.topology.impl.FlinkProcessingItem.invoke(FlinkProcessingItem.java:143) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invokeUserFunction(StreamVertex.java:85) at org.apache.flink.streaming.api.streamvertex.OutputHandler.invokeUserFunction(OutputHandler.java:229) ... 3 more Caused by: java.lang.ClassNotFoundException: com.yahoo.labs.samoa.examples.HelloWorldContentEvent at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:340) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:162) ... 17 more at org.apache.flink.client.program.Client.run(Client.java:345) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:49) at com.yahoo.labs.flink.FlinkDoTask.main(FlinkDoTask.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.i
[jira] [Created] (FLINK-1579) Create a Flink History Server
Robert Metzger created FLINK-1579: - Summary: Create a Flink History Server Key: FLINK-1579 URL: https://issues.apache.org/jira/browse/FLINK-1579 Project: Flink Issue Type: New Feature Affects Versions: 0.9 Reporter: Robert Metzger Right now its not possible to analyze the job results for jobs that ran on YARN, because we'll loose the information once the JobManager has stopped. Therefore, I propose to implement a "Flink History Server" which serves the results from these jobs. I haven't started thinking about the implementation, but I suspect it involves some JSON files stored in HDFS :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1579) Create a Flink History Server
[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325847#comment-14325847 ] Stephan Ewen commented on FLINK-1579: - How much is there in terms of Hadoop History server, or YARN history server? > Create a Flink History Server > - > > Key: FLINK-1579 > URL: https://issues.apache.org/jira/browse/FLINK-1579 > Project: Flink > Issue Type: New Feature >Affects Versions: 0.9 >Reporter: Robert Metzger > > Right now its not possible to analyze the job results for jobs that ran on > YARN, because we'll loose the information once the JobManager has stopped. > Therefore, I propose to implement a "Flink History Server" which serves the > results from these jobs. > I haven't started thinking about the implementation, but I suspect it > involves some JSON files stored in HDFS :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1579) Create a Flink History Server
[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325851#comment-14325851 ] Robert Metzger commented on FLINK-1579: --- We have to investigate on that: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/TimelineServer.html > Create a Flink History Server > - > > Key: FLINK-1579 > URL: https://issues.apache.org/jira/browse/FLINK-1579 > Project: Flink > Issue Type: New Feature >Affects Versions: 0.9 >Reporter: Robert Metzger > > Right now its not possible to analyze the job results for jobs that ran on > YARN, because we'll loose the information once the JobManager has stopped. > Therefore, I propose to implement a "Flink History Server" which serves the > results from these jobs. > I haven't started thinking about the implementation, but I suspect it > involves some JSON files stored in HDFS :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [builds] Allow to upload build artifacts to S3
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/407#issuecomment-74864911 Very good idea. +1 from my side --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1417) Automatically register nested types at Kryo
[ https://issues.apache.org/jira/browse/FLINK-1417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325864#comment-14325864 ] ASF GitHub Bot commented on FLINK-1417: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74867033 Merging it now > Automatically register nested types at Kryo > --- > > Key: FLINK-1417 > URL: https://issues.apache.org/jira/browse/FLINK-1417 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Reporter: Stephan Ewen >Assignee: Robert Metzger > Fix For: 0.9 > > > Currently, the {{GenericTypeInfo}} registers the class of the type at Kryo. > In order to get the best performance, it should recursively walk the classes > and make sure that it registered all contained subtypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74867033 Merging it now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-1578) Overhaul BLOB manager
[ https://issues.apache.org/jira/browse/FLINK-1578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-1578. - Resolution: Fixed Fixed via cfce493feb70a49d2722dc2a0d79f845f7e0461a > Overhaul BLOB manager > - > > Key: FLINK-1578 > URL: https://issues.apache.org/jira/browse/FLINK-1578 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 0.8, 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.9, 0.8 > > > The BLOB manager need improvements: > - Decent failure tests > - Better error handling (letting the client know what happened) > - Better error logging > - Retries upon failed fetches > - A bit of control over the maximum number of concurrent connections and the > backlog -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1201) Graph API for Flink
[ https://issues.apache.org/jira/browse/FLINK-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325885#comment-14325885 ] Stephan Ewen commented on FLINK-1201: - I think we can close this, now that Gelly is in. > Graph API for Flink > > > Key: FLINK-1201 > URL: https://issues.apache.org/jira/browse/FLINK-1201 > Project: Flink > Issue Type: New Feature >Reporter: Kostas Tzoumas >Assignee: Vasia Kalavri > > This issue tracks the development of a Graph API/DSL for Flink. > Until the code is pushed to the Flink repository, collaboration is happening > here: https://github.com/project-flink/flink-graph -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1201) Graph API for Flink
[ https://issues.apache.org/jira/browse/FLINK-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325918#comment-14325918 ] Vasia Kalavri commented on FLINK-1201: -- sure, I'll close it :) > Graph API for Flink > > > Key: FLINK-1201 > URL: https://issues.apache.org/jira/browse/FLINK-1201 > Project: Flink > Issue Type: New Feature >Reporter: Kostas Tzoumas >Assignee: Vasia Kalavri > > This issue tracks the development of a Graph API/DSL for Flink. > Until the code is pushed to the Flink repository, collaboration is happening > here: https://github.com/project-flink/flink-graph -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-1201) Graph API for Flink
[ https://issues.apache.org/jira/browse/FLINK-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri closed FLINK-1201. Resolution: Implemented > Graph API for Flink > > > Key: FLINK-1201 > URL: https://issues.apache.org/jira/browse/FLINK-1201 > Project: Flink > Issue Type: New Feature >Reporter: Kostas Tzoumas >Assignee: Vasia Kalavri > > This issue tracks the development of a Graph API/DSL for Flink. > Until the code is pushed to the Flink repository, collaboration is happening > here: https://github.com/project-flink/flink-graph -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1388) POJO support for writeAsCsv
[ https://issues.apache.org/jira/browse/FLINK-1388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325971#comment-14325971 ] Robert Metzger commented on FLINK-1388: --- Yes, adding additional tests is always good. > POJO support for writeAsCsv > --- > > Key: FLINK-1388 > URL: https://issues.apache.org/jira/browse/FLINK-1388 > Project: Flink > Issue Type: New Feature > Components: Java API >Reporter: Timo Walther >Assignee: Adnan Khan >Priority: Minor > > It would be great if one could simply write out POJOs in CSV format. > {code} > public class MyPojo { >String a; >int b; > } > {code} > to: > {code} > # CSV file of org.apache.flink.MyPojo: String a, int b > "Hello World", 42 > "Hello World 2", 47 > ... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1388) POJO support for writeAsCsv
[ https://issues.apache.org/jira/browse/FLINK-1388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14325974#comment-14325974 ] Robert Metzger commented on FLINK-1388: --- - The {{DataSet.internalWriteAsCsv}} currently checks if the DataSet type is a Tuple. I would extend the functionality to also allow POJOs there. - {{TypeExtractor.createTypeInfo()}} is meant to be called only when the job is being created on the client side ("pre flight" phase). > POJO support for writeAsCsv > --- > > Key: FLINK-1388 > URL: https://issues.apache.org/jira/browse/FLINK-1388 > Project: Flink > Issue Type: New Feature > Components: Java API >Reporter: Timo Walther >Assignee: Adnan Khan >Priority: Minor > > It would be great if one could simply write out POJOs in CSV format. > {code} > public class MyPojo { >String a; >int b; > } > {code} > to: > {code} > # CSV file of org.apache.flink.MyPojo: String a, int b > "Hello World", 42 > "Hello World 2", 47 > ... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/393 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1417) Automatically register nested types at Kryo
[ https://issues.apache.org/jira/browse/FLINK-1417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326043#comment-14326043 ] ASF GitHub Bot commented on FLINK-1417: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/393 > Automatically register nested types at Kryo > --- > > Key: FLINK-1417 > URL: https://issues.apache.org/jira/browse/FLINK-1417 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Reporter: Stephan Ewen >Assignee: Robert Metzger > Fix For: 0.9 > > > Currently, the {{GenericTypeInfo}} registers the class of the type at Kryo. > In order to get the best performance, it should recursively walk the classes > and make sure that it registered all contained subtypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1395) Add Jodatime support to Kryo
[ https://issues.apache.org/jira/browse/FLINK-1395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-1395. --- Resolution: Fixed Fix Version/s: 0.9 Resolved for master 0.9 in http://git-wip-us.apache.org/repos/asf/flink/commit/5015ab49 > Add Jodatime support to Kryo > > > Key: FLINK-1395 > URL: https://issues.apache.org/jira/browse/FLINK-1395 > Project: Flink > Issue Type: Sub-task >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 0.9 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1392) Serializing Protobuf - issue 1
[ https://issues.apache.org/jira/browse/FLINK-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-1392. --- Resolution: Fixed Fix Version/s: 0.9 Resolved for 0.9 in master with commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77c45484 > Serializing Protobuf - issue 1 > -- > > Key: FLINK-1392 > URL: https://issues.apache.org/jira/browse/FLINK-1392 > Project: Flink > Issue Type: Sub-task >Affects Versions: 0.8, 0.9 >Reporter: Felix Neutatz >Assignee: Robert Metzger >Priority: Minor > Fix For: 0.9, 0.8.1 > > > Hi, I started to experiment with Parquet using Protobuf. > When I use the standard Protobuf class: > com.twitter.data.proto.tutorial.AddressBookProtos > The code which I run, can be found here: > [https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java] > I get the following exception: > {code:xml} > Exception in thread "main" java.lang.Exception: Deserializing the > InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: > Could not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: Error while deserializing element from > collection > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 25 more > Caused by: java.io.IOException: Error while deserializing element from > collection > at > org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.
[jira] [Resolved] (FLINK-1417) Automatically register nested types at Kryo
[ https://issues.apache.org/jira/browse/FLINK-1417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-1417. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/354efec0 > Automatically register nested types at Kryo > --- > > Key: FLINK-1417 > URL: https://issues.apache.org/jira/browse/FLINK-1417 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Reporter: Stephan Ewen >Assignee: Robert Metzger > Fix For: 0.9 > > > Currently, the {{GenericTypeInfo}} registers the class of the type at Kryo. > In order to get the best performance, it should recursively walk the classes > and make sure that it registered all contained subtypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1391) Kryo fails to properly serialize avro collection types
[ https://issues.apache.org/jira/browse/FLINK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-1391. --- Resolution: Fixed Fix Version/s: 0.9 Resolved for 0.9 into master with http://git-wip-us.apache.org/repos/asf/flink/commit/7e39bc67 > Kryo fails to properly serialize avro collection types > -- > > Key: FLINK-1391 > URL: https://issues.apache.org/jira/browse/FLINK-1391 > Project: Flink > Issue Type: Sub-task >Affects Versions: 0.8, 0.9 >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 0.9, 0.8.1 > > > Before FLINK-610, Avro was the default generic serializer. > Now, special types coming from Avro are handled by Kryo .. which seems to > cause errors like: > {code} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > java.lang.NullPointerException > at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:143) > at > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:148) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:244) > at > org.apache.flink.runtime.plugable.DeserializationDelegate.read(DeserializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer.getNextRecord(AdaptiveSpanningRecordDeserializer.java:71) > at > org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:189) > at > org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) > at > org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) > at java.lang.Thread.run(Thread.java:744) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1567) Add option to switch between Avro and Kryo serialization for GenericTypes
[ https://issues.apache.org/jira/browse/FLINK-1567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326054#comment-14326054 ] Robert Metzger commented on FLINK-1567: --- Resolved for 0.9 in master in http://git-wip-us.apache.org/repos/asf/flink/commit/e6754a6d > Add option to switch between Avro and Kryo serialization for GenericTypes > - > > Key: FLINK-1567 > URL: https://issues.apache.org/jira/browse/FLINK-1567 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.8, 0.9 >Reporter: Robert Metzger > > Allow users to switch the underlying serializer for GenericTypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1567) Add option to switch between Avro and Kryo serialization for GenericTypes
[ https://issues.apache.org/jira/browse/FLINK-1567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-1567: - Assignee: Robert Metzger > Add option to switch between Avro and Kryo serialization for GenericTypes > - > > Key: FLINK-1567 > URL: https://issues.apache.org/jira/browse/FLINK-1567 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.8, 0.9 >Reporter: Robert Metzger >Assignee: Robert Metzger > > Allow users to switch the underlying serializer for GenericTypes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1466) Add InputFormat to read HCatalog tables
[ https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326134#comment-14326134 ] ASF GitHub Bot commented on FLINK-1466: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/411#issuecomment-74894556 @rmetzger Thanks for the feedback. Added support for complex types to the Flink tuple mode and tested it on a local cluster setup. > Add InputFormat to read HCatalog tables > --- > > Key: FLINK-1466 > URL: https://issues.apache.org/jira/browse/FLINK-1466 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > HCatalog is a metadata repository and InputFormat to make Hive tables > accessible to other frameworks such as Pig. > Adding support for HCatalog would give access to Hive managed data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1466] Add HCatInputFormats to read from...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/411#issuecomment-74894556 @rmetzger Thanks for the feedback. Added support for complex types to the Flink tuple mode and tested it on a local cluster setup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1466) Add InputFormat to read HCatalog tables
[ https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326147#comment-14326147 ] ASF GitHub Bot commented on FLINK-1466: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/411#issuecomment-74895705 Cool. Then I think the change is good to merge. > Add InputFormat to read HCatalog tables > --- > > Key: FLINK-1466 > URL: https://issues.apache.org/jira/browse/FLINK-1466 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > HCatalog is a metadata repository and InputFormat to make Hive tables > accessible to other frameworks such as Pig. > Adding support for HCatalog would give access to Hive managed data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1466] Add HCatInputFormats to read from...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/411#issuecomment-74895705 Cool. Then I think the change is good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-1580) Cleanup TaskManager initialization logic
Till Rohrmann created FLINK-1580: Summary: Cleanup TaskManager initialization logic Key: FLINK-1580 URL: https://issues.apache.org/jira/browse/FLINK-1580 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Currently, the TaskManager initializes many heavy load objects upon registration at the JobManager. If an exception occurs during the initialization it takes quite long until the {{JobManager}} detects the {{TaskManager}} failure. Therefore, it would be better if we could rearrange the initialization logic so that the {{TaskManager}} only registers at the {{JobManager}} if the all objects could be initialized successfully. Moreover, it would be worthwhile to move some of the initialization work out of the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1581) Configure DeathWatch parameters properly
Till Rohrmann created FLINK-1581: Summary: Configure DeathWatch parameters properly Key: FLINK-1581 URL: https://issues.apache.org/jira/browse/FLINK-1581 Project: Flink Issue Type: Bug Reporter: Till Rohrmann We are using Akka's DeathWath mechanism to detect failed components. However, the interval until an {{Instance}} is marked dead is currently very long. Especially, in conjunction with the job restarting mechanism we should devise a mechanism which either quickly detects dead {{Instance}}s or set the interval, pause and threshold values such that the detection does not take longer than the Akka ask timeout value. Otherwise, all retries might be consumed before an {{Instance}} is recognized being dead. Further investigation of the correct failure behavior is necessary. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326190#comment-14326190 ] Fay Beligianni commented on FLINK-1421: --- Hey Stefan, Thank you very much for the response! Indeed that was the problem. Regarding the Java serialization, yes it is not efficient but when we tried to use the Kryo serializer we were encountering problems with the tuples that we are streaming between the invokables. Specifically, because we are streaming Tuple3 elements, where UserDefinedClass is a custom implementation of the Samoa "ContentEvent" interface, we couldn't pass the TypeInformation of the custom object to the serializer, thus we had to use Java serialization. For sure though we will try to come up with a more efficient solution for that issue. P.S.: Thanks for the formatting hint! > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326189#comment-14326189 ] Fay Beligianni commented on FLINK-1421: --- Hey Stefan, Thank you very much for the response! Indeed that was the problem. Regarding the Java serialization, yes it is not efficient but when we tried to use the Kryo serializer we were encountering problems with the tuples that we are streaming between the invokables. Specifically, because we are streaming Tuple3 elements, where UserDefinedClass is a custom implementation of the Samoa "ContentEvent" interface, we couldn't pass the TypeInformation of the custom object to the serializer, thus we had to use Java serialization. For sure though we will try to come up with a more efficient solution for that issue. P.S.: Thanks for the formatting hint! > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1582) SocketStream gets stuck when socket closes
[ https://issues.apache.org/jira/browse/FLINK-1582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi updated FLINK-1582: -- Labels: starter (was: ) > SocketStream gets stuck when socket closes > -- > > Key: FLINK-1582 > URL: https://issues.apache.org/jira/browse/FLINK-1582 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 0.8, 0.9 >Reporter: Márton Balassi > Labels: starter > > When the server side of the socket closes the socket stream reader does not > terminate. When the socket is reinitiated it does not reconnect just gets > stuck. > It would be nice to add options for the user have the reader should behave > when the socket is down: terminate immediately (good for testing and > examples) or wait a specified time - possibly forever. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326193#comment-14326193 ] Fay Beligianni commented on FLINK-1421: --- We already tried that workaround for testing purposes and indeed worked, but as you already said it is not a good solution. > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326196#comment-14326196 ] Fay Beligianni commented on FLINK-1421: --- Hey Stefan, Thank you very much for the response! Indeed that was the problem. Regarding the Java serialization, yes it is not efficient but when we tried to use the Kryo serializer we were encountering problems with the tuples that we are streaming between the invokables. Specifically, because we are streaming Tuple3 elements, where UserDefinedClass is a custom implementation of the Samoa "ContentEvent" interface, we couldn't pass the TypeInformation of the custom object to the serializer, thus we had to use Java serialization. For sure though we will try to come up with a more efficient solution for that issue. P.S.: Thanks for the formatting hint! > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fay Beligianni updated FLINK-1421: -- Comment: was deleted (was: Hey Stefan, Thank you very much for the response! Indeed that was the problem. Regarding the Java serialization, yes it is not efficient but when we tried to use the Kryo serializer we were encountering problems with the tuples that we are streaming between the invokables. Specifically, because we are streaming Tuple3 elements, where UserDefinedClass is a custom implementation of the Samoa "ContentEvent" interface, we couldn't pass the TypeInformation of the custom object to the serializer, thus we had to use Java serialization. For sure though we will try to come up with a more efficient solution for that issue. P.S.: Thanks for the formatting hint! ) > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fay Beligianni updated FLINK-1421: -- Comment: was deleted (was: Hey Stefan, Thank you very much for the response! Indeed that was the problem. Regarding the Java serialization, yes it is not efficient but when we tried to use the Kryo serializer we were encountering problems with the tuples that we are streaming between the invokables. Specifically, because we are streaming Tuple3 elements, where UserDefinedClass is a custom implementation of the Samoa "ContentEvent" interface, we couldn't pass the TypeInformation of the custom object to the serializer, thus we had to use Java serialization. For sure though we will try to come up with a more efficient solution for that issue. P.S.: Thanks for the formatting hint! ) > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1421) Implement a SAMOA Adapter for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326196#comment-14326196 ] Fay Beligianni edited comment on FLINK-1421 at 2/18/15 5:13 PM: Hey Stefan, Thank you very much for the response! Indeed that was the problem.\\ Regarding the Java serialization, yes it is not efficient but when we tried to use the Kryo serializer we were encountering problems with the tuples that we are streaming between the invokables.\\ Specifically, because we are streaming Tuple3 elements, where UserDefinedClass is a custom implementation of the Samoa "ContentEvent" interface, we couldn't pass the TypeInformation of the custom object to the serializer, thus we had to use Java serialization.\\ For sure though we will try to come up with a more efficient solution for that issue.\\ P.S.: Thanks for the formatting hint! was (Author: fobeligi): Hey Stefan, Thank you very much for the response! Indeed that was the problem. Regarding the Java serialization, yes it is not efficient but when we tried to use the Kryo serializer we were encountering problems with the tuples that we are streaming between the invokables. Specifically, because we are streaming Tuple3 elements, where UserDefinedClass is a custom implementation of the Samoa "ContentEvent" interface, we couldn't pass the TypeInformation of the custom object to the serializer, thus we had to use Java serialization. For sure though we will try to come up with a more efficient solution for that issue. P.S.: Thanks for the formatting hint! > Implement a SAMOA Adapter for Flink Streaming > - > > Key: FLINK-1421 > URL: https://issues.apache.org/jira/browse/FLINK-1421 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Paris Carbone > Original Estimate: 336h > Remaining Estimate: 336h > > Yahoo's Samoa is an experimental incremental machine learning library that > builds on an abstract compositional data streaming model to write streaming > algorithms. The task is to provide an adapter from SAMOA topologies to > Flink-streaming job graphs in order to support Flink as a backend engine for > SAMOA tasks. > A statup guide can be viewed here : > https://docs.google.com/document/d/18glDJDYmnFGT1UGtZimaxZpGeeg1Ch14NgDoymhPk2A/pub > The main working branch of the adapter : > https://github.com/senorcarbone/samoa/tree/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1582) SocketStream gets stuck when socket closes
Márton Balassi created FLINK-1582: - Summary: SocketStream gets stuck when socket closes Key: FLINK-1582 URL: https://issues.apache.org/jira/browse/FLINK-1582 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.8, 0.9 Reporter: Márton Balassi When the server side of the socket closes the socket stream reader does not terminate. When the socket is reinitiated it does not reconnect just gets stuck. It would be nice to add options for the user have the reader should behave when the socket is down: terminate immediately (good for testing and examples) or wait a specified time - possibly forever. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1583) TaskManager reregistration in case of a restart
Till Rohrmann created FLINK-1583: Summary: TaskManager reregistration in case of a restart Key: FLINK-1583 URL: https://issues.apache.org/jira/browse/FLINK-1583 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Currently, the {{InstanceManager}} identifies {{Instance}}s based on their {{InstanceConnectionInfo}}. In case of a restarted {{TaskManager}} which tries to register newly at the {{JobManager}}, the {{InstanceManager}} can mistake this {{TaskManager}} as already registered. This can lead to a corrupted state. We should identify {{TaskManager}}s based on some ID to distinguish distinct registration attempts of a restarted {{TaskManager}}. This will improve the system's stability. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1461][api-extending] Add SortPartition ...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/381#issuecomment-74908224 any further comment on this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1461) Add sortPartition operator
[ https://issues.apache.org/jira/browse/FLINK-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326234#comment-14326234 ] ASF GitHub Bot commented on FLINK-1461: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/381#issuecomment-74908224 any further comment on this PR? > Add sortPartition operator > -- > > Key: FLINK-1461 > URL: https://issues.apache.org/jira/browse/FLINK-1461 > Project: Flink > Issue Type: New Feature > Components: Java API, Local Runtime, Optimizer, Scala API >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > A {{sortPartition()}} operator can be used to > * sort the input of a {{mapPartition()}} operator > * enforce a certain sorting of the input of a given operator of a program. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1444][api-extending] Add support for sp...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/379#issuecomment-74908763 @rmetzger thanks for the review. Will merge tomorrow if nobody raises a flag. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1444) Add data properties for data sources
[ https://issues.apache.org/jira/browse/FLINK-1444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326240#comment-14326240 ] ASF GitHub Bot commented on FLINK-1444: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/379#issuecomment-74908763 @rmetzger thanks for the review. Will merge tomorrow if nobody raises a flag. > Add data properties for data sources > > > Key: FLINK-1444 > URL: https://issues.apache.org/jira/browse/FLINK-1444 > Project: Flink > Issue Type: New Feature > Components: Java API, JobManager, Optimizer >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > This issue proposes to add support for attaching data properties to data > sources. These data properties are defined with respect to input splits. > Possible properties are: > - partitioning across splits: all elements of the same key (combination) are > contained in one split > - sorting / grouping with splits: elements are sorted or grouped on certain > keys within a split > - key uniqueness: a certain key (combination) is unique for all elements of > the data source. This property is not defined wrt. input splits. > The optimizer can leverage this information to generate more efficient > execution plans. > The InputFormat will be responsible to generate input splits such that the > promised data properties are actually in place. Otherwise, the program will > produce invalid results. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Remove extra space after open parenthesis in I...
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/416 Remove extra space after open parenthesis in InstanceConnectionInfo#toString Small update to remove extra space after open parenthesis in InstanceConnectionInfo#toString to be consistent with other messages and toString calls. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink fix_extra_space_in_InstanceConnectionInfo_tostring Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #416 commit cc175ae2f3e3a2e7b00ed1207f215487cdc38c2f Author: Henry Saputra Date: 2015-02-18T18:11:38Z Remove extra space after open parenthesis in InstanceConnectionInfo#toString. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1483) Temporary channel files are not properly deleted when Flink is terminated
[ https://issues.apache.org/jira/browse/FLINK-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326399#comment-14326399 ] ASF GitHub Bot commented on FLINK-1483: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/417 [FLINK-1483] IOManager puts temp files in dedicated directory and cleans up on shutdown You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink ioman_cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/417.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #417 commit 53f5f04bd0151089a1b8f1f2bd593c6b4625b7be Author: Stephan Ewen Date: 2015-02-18T14:03:25Z [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown > Temporary channel files are not properly deleted when Flink is terminated > - > > Key: FLINK-1483 > URL: https://issues.apache.org/jira/browse/FLINK-1483 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 0.8, 0.9 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > > The temporary channel files are not properly deleted if the IOManager does > not shut down properly. This can be the case when the TaskManagers are > terminated by Flink's shell scripts. > A solution could be to store all channel files of one TaskManager in a > uniquely identifiable directory and to register a shutdown hook which deletes > this file upon termination. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/417 [FLINK-1483] IOManager puts temp files in dedicated directory and cleans up on shutdown You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink ioman_cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/417.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #417 commit 53f5f04bd0151089a1b8f1f2bd593c6b4625b7be Author: Stephan Ewen Date: 2015-02-18T14:03:25Z [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1483) Temporary channel files are not properly deleted when Flink is terminated
[ https://issues.apache.org/jira/browse/FLINK-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326411#comment-14326411 ] ASF GitHub Bot commented on FLINK-1483: --- Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/417#discussion_r24931138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java --- @@ -52,26 +57,88 @@ /** * Constructs a new IOManager. * -* @param paths -*the basic directory paths for files underlying anonymous channels. +* @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] paths) { - this.paths = paths; + protected IOManager(String[] tempDirs) { + if (tempDirs == null || tempDirs.length == 0) { + throw new IllegalArgumentException("The temporary directories must not be null or empty."); + } + this.random = new Random(); this.nextPath = 0; + + this.paths = new File[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() && !storageDir.mkdirs()) { + throw new RuntimeException( + "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath()); + } + paths[i] = storageDir; + LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); + } + + this.shutdownHook = new Thread("I/O manager shutdown hook") { + @Override + public void run() { + shutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(this.shutdownHook); } /** -* Close method, marks the I/O manager as closed. +* Close method, marks the I/O manager as closed +* and removed all temporary files. */ - public abstract void shutdown(); + public void shutdown() { + // remove all of our temp directories + for (File path : paths) { + try { + if (path != null) { + if (path.exists()) { + FileUtils.deleteDirectory(path); + LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); + } + } + } catch (Throwable t) { + LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); + } + } + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); + } + } + } /** * Utility method to check whether the IO manager has been properly shut down. +* For this base implementation, this means that all files have been removed. * * @return True, if the IO manager has properly shut down, false otherwise. */ - public abstract boolean isProperlyShutDown(); + public boolean isProperlyShutDown() { + for (File path : paths) { + if (path != null) { --- End diff -- Would this easier to read with check {{if(path != null && path.exists())}} > Temporary channel files are not properly deleted when Flink is terminated > - > > Key: FLINK-1483 > URL: https://issues.apache.org/jira/browse/FLINK-1483 > Project: Flink > Issue
[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/417#discussion_r24931138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java --- @@ -52,26 +57,88 @@ /** * Constructs a new IOManager. * -* @param paths -*the basic directory paths for files underlying anonymous channels. +* @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] paths) { - this.paths = paths; + protected IOManager(String[] tempDirs) { + if (tempDirs == null || tempDirs.length == 0) { + throw new IllegalArgumentException("The temporary directories must not be null or empty."); + } + this.random = new Random(); this.nextPath = 0; + + this.paths = new File[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() && !storageDir.mkdirs()) { + throw new RuntimeException( + "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath()); + } + paths[i] = storageDir; + LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); + } + + this.shutdownHook = new Thread("I/O manager shutdown hook") { + @Override + public void run() { + shutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(this.shutdownHook); } /** -* Close method, marks the I/O manager as closed. +* Close method, marks the I/O manager as closed +* and removed all temporary files. */ - public abstract void shutdown(); + public void shutdown() { + // remove all of our temp directories + for (File path : paths) { + try { + if (path != null) { + if (path.exists()) { + FileUtils.deleteDirectory(path); + LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); + } + } + } catch (Throwable t) { + LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); + } + } + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); + } + } + } /** * Utility method to check whether the IO manager has been properly shut down. +* For this base implementation, this means that all files have been removed. * * @return True, if the IO manager has properly shut down, false otherwise. */ - public abstract boolean isProperlyShutDown(); + public boolean isProperlyShutDown() { + for (File path : paths) { + if (path != null) { --- End diff -- Would this easier to read with check {{if(path != null && path.exists())}} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1483) Temporary channel files are not properly deleted when Flink is terminated
[ https://issues.apache.org/jira/browse/FLINK-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14326429#comment-14326429 ] ASF GitHub Bot commented on FLINK-1483: --- Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/417#issuecomment-74930615 Since the IOManager add shutdown hook to clean up the files, should IOManagerAsync#isProperlyShutDown need to call super.isProperlyShutDown ? > Temporary channel files are not properly deleted when Flink is terminated > - > > Key: FLINK-1483 > URL: https://issues.apache.org/jira/browse/FLINK-1483 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 0.8, 0.9 >Reporter: Till Rohrmann >Assignee: Ufuk Celebi > > The temporary channel files are not properly deleted if the IOManager does > not shut down properly. This can be the case when the TaskManagers are > terminated by Flink's shell scripts. > A solution could be to store all channel files of one TaskManager in a > uniquely identifiable directory and to register a shutdown hook which deletes > this file upon termination. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/417#issuecomment-74930615 Since the IOManager add shutdown hook to clean up the files, should IOManagerAsync#isProperlyShutDown need to call super.isProperlyShutDown ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---