[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708407#comment-16708407 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys closed pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala index a1568c29b61..804acfcf045 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala @@ -21,9 +21,8 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{AsyncDataStream => JavaAsyncDataStream} -import org.apache.flink.streaming.api.functions.async.{ResultFuture => JavaResultFuture} -import org.apache.flink.streaming.api.functions.async.{AsyncFunction => JavaAsyncFunction} -import org.apache.flink.streaming.api.scala.async.{AsyncFunction, JavaResultFutureWrapper, ResultFuture} +import org.apache.flink.streaming.api.functions.async.{AsyncFunction => JavaAsyncFunction, ResultFuture => JavaResultFuture} +import org.apache.flink.streaming.api.scala.async._ import org.apache.flink.util.Preconditions import scala.concurrent.duration.TimeUnit @@ -67,14 +66,7 @@ object AsyncDataStream { capacity: Int) : DataStream[OUT] = { -val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] { - override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { -asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture)) - } - override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { -asyncFunction.timeout(input, new JavaResultFutureWrapper(resultFuture)) - } -} +val javaAsyncFunction = wrapAsJavaAsyncFunction(asyncFunction) val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]] @@ -197,14 +189,7 @@ object AsyncDataStream { capacity: Int) : DataStream[OUT] = { -val javaAsyncFunction = new JavaAsyncFunction[IN, OUT] { - override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { -asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture)) - } - override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = { -asyncFunction.timeout(input, new JavaResultFutureWrapper[OUT](resultFuture)) - } -} +val javaAsyncFunction = wrapAsJavaAsyncFunction(asyncFunction) val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]] @@ -229,12 +214,11 @@ object AsyncDataStream { * @return the resulting stream containing the asynchronous results */ def orderedWait[IN, OUT: TypeInformation]( -input: DataStream[IN], -asyncFunction: AsyncFunction[IN, OUT], -timeout: Long, -timeUnit: TimeUnit) - : DataStream[OUT] = { - + input: DataStream[IN], + asyncFunction: AsyncFunction[IN, OUT], + timeout: Long, + timeUnit: TimeUnit) +: DataStream[OUT] = { orderedWait(input, asyncFunction, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY) } @@ -253,12 +237,12 @@ object AsyncDataStream { * @return the resulting stream containing the asynchronous results */ def orderedWait[IN, OUT: TypeInformation]( -input: DataStream[IN], -timeout: Long, -timeUnit: TimeUnit, -capacity: Int) ( -asyncFunction: (IN, ResultFuture[OUT]) => Unit) - : DataStream[OUT] = { + input: DataStream[IN], + timeout: Long, + timeUnit: TimeUnit, + capacity: Int) ( + asyncFunction: (IN, ResultFuture[OUT]) => Unit) +: DataStream[OUT] = { Preconditions.checkNotNull(asyncFunction) @@ -293,12 +277,29 @@ object AsyncDataStream { * @return the resulting stream containing the asynchronous results */ def orderedWait[IN, OUT: TypeInformation]( -input: DataStream[IN], -timeout: Long, -timeUnit: TimeUnit) ( -asyncFunction: (IN, ResultFuture[OUT]) => Unit) - : DataStream[OUT] = { + input: DataStream[IN], + timeout: Long, + timeUnit: TimeUnit) ( + asyncFunction: (IN, ResultFuture[OUT]) => Unit) +: DataStream[OUT] = { orderedWait(input, timeout, tim
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708401#comment-16708401 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on issue #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#issuecomment-444019083 Thanks for the contribution @Clark. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708090#comment-16708090 ] ASF GitHub Bot commented on FLINK-6756: --- Clark commented on issue #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#issuecomment-443943474 cc @dawidwys, changes in RichAsyncFunction has been reverted. Thank you for your review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707195#comment-16707195 ] ASF GitHub Bot commented on FLINK-6756: --- Clark commented on issue #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#issuecomment-443710587 cc @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16707207#comment-16707207 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on issue #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#issuecomment-443711889 Just copying my message as top level comment: > Sorry for late response. I wouldn't change the visibility scope for RichAsyncFunctionIterationRuntimeContext. I think accessing the RuntimeContext is enough in this case (which you do by accessing the getNumberOfParallelSubtasks). > >Please revert that change, and then I think we will be ready to merge this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706835#comment-16706835 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r238178778 ## File path: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ## @@ -135,3 +158,19 @@ class MyAsyncFunction extends AsyncFunction[Int, Int] { resultFuture.complete(Seq(input * 3)) } } + +class MyRichAsyncFunction extends RichAsyncFunction[Int, String] { + override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = { Review comment: Sorry for late response. I wouldn't change the visibility scope for `RichAsyncFunctionIterationRuntimeContext`. I think accessing the `RuntimeContext` is enough in this case (which you do by accessing the `getNumberOfParallelSubtasks`). Please revert that change, and then I think we will be ready to merge this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705624#comment-16705624 ] ASF GitHub Bot commented on FLINK-6756: --- Clark commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r238051723 ## File path: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ## @@ -135,3 +158,19 @@ class MyAsyncFunction extends AsyncFunction[Int, Int] { resultFuture.complete(Seq(input * 3)) } } + +class MyRichAsyncFunction extends RichAsyncFunction[Int, String] { + override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = { Review comment: Or a public static inner class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705621#comment-16705621 ] ASF GitHub Bot commented on FLINK-6756: --- Clark commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r238051554 ## File path: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ## @@ -135,3 +158,19 @@ class MyAsyncFunction extends AsyncFunction[Int, Int] { resultFuture.complete(Seq(input * 3)) } } + +class MyRichAsyncFunction extends RichAsyncFunction[Int, String] { + override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = { Review comment: I was also thinking about that. But here I want to verify the runtime I got here is a RichAsyncFunctionRuntimeContext, which is a private inner class of RichAsyncFunction. So I cannot use isInstanceOf to verify it. Perhaps I could move the inner class out of RichAsyncFunction and made it accessible by test, so I can verify the RuntimeContext. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704601#comment-16704601 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237829048 ## File path: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ## @@ -135,3 +158,19 @@ class MyAsyncFunction extends AsyncFunction[Int, Int] { resultFuture.complete(Seq(input * 3)) } } + +class MyRichAsyncFunction extends RichAsyncFunction[Int, String] { + override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = { Review comment: Also could you change the `MyRichAsyncFunction` so that it is actually executed in an async way. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704600#comment-16704600 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237784773 ## File path: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/util/ScalaRichAsyncFunctionWrapper.scala ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.async.util + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.async.{ResultFuture, RichAsyncFunction => JRichAsyncFunction} +import org.apache.flink.streaming.api.scala.async.{JavaResultFutureWrapper, RichAsyncFunction} + +/** + * A wrapper function that exposes a Scala RichAsyncFunction as a Java Rich Async Function. + * + * The Scala and Java RichAsyncFunctions differ in their type of "ResultFuture" + * - Scala RichAsyncFunction: org.apache.flink.streaming.api.scala.async.ResultFuture + * - Java RichAsyncFunction: org.apache.flink.streaming.api.functions.async.ResultFuture + */ +final class ScalaRichAsyncFunctionWrapper[IN, OUT](func: RichAsyncFunction[IN, OUT]) + extends JRichAsyncFunction[IN, OUT]{ + + override def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit = { +func.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture)) + } + + override def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = { +func.timeout(input, new JavaResultFutureWrapper[OUT](resultFuture)) + } + + override def open(parameters: Configuration): Unit = { +super.open(parameters) +func.open(parameters) + } + + override def close(): Unit = { +super.close() Review comment: Same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704603#comment-16704603 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237784333 ## File path: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/util/ScalaRichAsyncFunctionWrapper.scala ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.async.util Review comment: Move one level up. See no reason to introduce separate package for a single class. We have the `JavaResultFutureWrapper` there already, which is quite similar. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704597#comment-16704597 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237784643 ## File path: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/util/ScalaRichAsyncFunctionWrapper.scala ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.async.util + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.async.{ResultFuture, RichAsyncFunction => JRichAsyncFunction} +import org.apache.flink.streaming.api.scala.async.{JavaResultFutureWrapper, RichAsyncFunction} + +/** + * A wrapper function that exposes a Scala RichAsyncFunction as a Java Rich Async Function. + * + * The Scala and Java RichAsyncFunctions differ in their type of "ResultFuture" + * - Scala RichAsyncFunction: org.apache.flink.streaming.api.scala.async.ResultFuture + * - Java RichAsyncFunction: org.apache.flink.streaming.api.functions.async.ResultFuture + */ +final class ScalaRichAsyncFunctionWrapper[IN, OUT](func: RichAsyncFunction[IN, OUT]) + extends JRichAsyncFunction[IN, OUT]{ + + override def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit = { +func.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture)) + } + + override def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = { +func.timeout(input, new JavaResultFutureWrapper[OUT](resultFuture)) + } + + override def open(parameters: Configuration): Unit = { +super.open(parameters) Review comment: remove the `super.open(parameters)` it actually does nothing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704604#comment-16704604 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237829126 ## File path: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ## @@ -33,6 +33,7 @@ import scala.concurrent.{ExecutionContext, Future} object AsyncDataStreamITCase { val timeout = 1000L private var testResult: mutable.ArrayBuffer[Int] = _ + private var runtimeTestResult: mutable.ArrayBuffer[String] = _ Review comment: Try to reuse the previous buffer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704599#comment-16704599 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237783551 ## File path: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala ## @@ -301,4 +287,21 @@ object AsyncDataStream { orderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction) } + + private def wrapAsJavaAsyncFunction[IN, OUT: TypeInformation]( +asyncFunction: AsyncFunction[IN, OUT]) + : JavaAsyncFunction[IN, OUT] = asyncFunction match { Review comment: Add indentation. In scala the guidelines are to format code as follows: ``` private def wrapAsJavaAsyncFunction[IN, OUT: TypeInformation]( asyncFunction: AsyncFunction[IN, OUT]) : JavaAsyncFunction[IN, OUT] = asyncFunction match { case richAsyncFunction: RichAsyncFunction[IN, OUT] => ``` so that the colon has the same indentation as the code inside function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704598#comment-16704598 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237826877 ## File path: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala ## @@ -135,3 +158,19 @@ class MyAsyncFunction extends AsyncFunction[Int, Int] { resultFuture.complete(Seq(input * 3)) } } + +class MyRichAsyncFunction extends RichAsyncFunction[Int, String] { + override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = { Review comment: Could you rework the example so that it uses different method of the `RuntimeContext`, one that does not throw an exception? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704602#comment-16704602 ] ASF GitHub Bot commented on FLINK-6756: --- dawidwys commented on a change in pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#discussion_r237784432 ## File path: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/util/ScalaRichAsyncFunctionWrapper.scala ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.async.util + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.async.{ResultFuture, RichAsyncFunction => JRichAsyncFunction} +import org.apache.flink.streaming.api.scala.async.{JavaResultFutureWrapper, RichAsyncFunction} + +/** + * A wrapper function that exposes a Scala RichAsyncFunction as a Java Rich Async Function. + * + * The Scala and Java RichAsyncFunctions differ in their type of "ResultFuture" + * - Scala RichAsyncFunction: org.apache.flink.streaming.api.scala.async.ResultFuture + * - Java RichAsyncFunction: org.apache.flink.streaming.api.functions.async.ResultFuture + */ +final class ScalaRichAsyncFunctionWrapper[IN, OUT](func: RichAsyncFunction[IN, OUT]) Review comment: Annotate with `@Internal` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697030#comment-16697030 ] ASF GitHub Bot commented on FLINK-6756: --- Clark opened a new pull request #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168 ## What is the purpose of the change Provide Rich AsyncFunction to Scala API suite ## Brief change log - Proivde RichAsyncFunction for Scala API suite - Using a ScalaRichAsyncFunctionWrapper to wrap it into a Java RichAsyncFunction ## Verifying this change The Java Async runtime context has been tested in RichAsyncFunctionTest. And the access to Async runtime context is now covered in method testRichAsyncFunctionRuntimeContext of AsyncDataStreamITCase. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > Labels: pull-request-available > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690910#comment-16690910 ] Dawid Wysakowicz commented on FLINK-6756: - Hi [~dangdangdang] thanks for your interest in taking this over. Feel free to do it. I think we should follow the approach described in this issue. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690520#comment-16690520 ] Shimin Yang commented on FLINK-6756: Well, I can take this over. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689543#comment-16689543 ] Antoine Philippot commented on FLINK-6756: -- Hi [~dawidwys], sorry but I'm not working on Flink anymore :( and I won't be able to reapply the patch on the current master. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688148#comment-16688148 ] Dawid Wysakowicz commented on FLINK-6756: - HI [~aphilippot] just wanted to ask if you are still interested in working on this issue? I think your solution was close to being finished we could discuss if you open a PR. The {{setRuntimeContext}} has to be invoked twice in order for the scala version to be able to call {{getRuntimeContext}}. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428676#comment-16428676 ] Elias Levy commented on FLINK-6756: --- Just ran into this, any progress? > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339101#comment-16339101 ] Antoine Philippot commented on FLINK-6756: -- Hi [~aljoscha] and [~spi-x-i] I have started to implement the Aljoscha idea. This is my current commit [https://github.com/apache/flink/commit/96684058d4a209d39d7fb1667beb7f083d215b58] My only concern is about override setRuntimeContext [https://github.com/apache/flink/commit/96684058d4a209d39d7fb1667beb7f083d215b58#diff-e79f7947628990ee5298af04bca1c399R17] compared to ScalaProcessWindowFunctionWrapper [https://github.com/apache/flink/blob/master/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala#L86] I don't understand the both call super.setRuntimeContext(t) and rfunc.setRuntimeContext(t) What do you think about that ? > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309739#comment-16309739 ] Aljoscha Krettek commented on FLINK-6756: - This wrapper probably needs to be implemented similarly to {{ScalaProcessWindowFunctionWrapper}}. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028506#comment-16028506 ] Andrea Spina commented on FLINK-6756: - Ok, thank you. I'll provide an implementation of it asap. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028170#comment-16028170 ] Till Rohrmann commented on FLINK-6756: -- I think there is nothing forbidding the implementation of a rich variant of the {{AsyncFunction}} for the Scala API. Thus, it would be great if you could tackle the issue [~spi-x-i]. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)