Hi Till,
Thanks for your reply, I manage do some experiments and has result as some
worked and some not. I hope you can give me a bit more insight:
As your suggestion to impl a `RichAsyncFunction` with transient field, like
this and having error
```
Class 'RichAsyncHttpClient' must either be declared abstract or implement
abstract member 'executionContext: ExecutionContextExecutor' in
‘com.parcelperform.util.RichAsyncHttpClient’
```
```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment,
Either[Throwable, ResponseEntity]]{
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
@transient implicit var materializer: ActorMaterializer
@transient implicit var system: ActorSystem
@transient implicit var executionContext: ExecutionContextExecutor
override def asyncInvoke(input: Shipment, resultFuture:
async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
val resultFutureRequested: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
}
case Failure(x) => {
resultFuture.complete(Iterable(Either(x)).asJavaCollection)
}
}
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
system = ActorSystem("my-system")
executionContext = system.dispatcher
materializer = ActorMaterializer()
}
}
```
Aslo the Usage of that class, I has error, I guess its because of java/scala
issue. In flink doc, for java code they use RichAsyncFunction and for scala
they use AsyncFunction:
```
// AsyncDataStream.unorderedWait(streamShipment, new
RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type
mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual:
RichAsyncHttpClient
```
###
So I try to fix my current code again with transient field and move it into
constructor:
```
class AsyncHttpClient( args: Array[String] = Array()) extends
AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
@transient implicit lazy val system = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
ActorSystem("my-system")
}
@transient implicit lazy val executionContext = {
system.dispatcher
}
@transient implicit lazy val materializer: ActorMaterializer = {
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
ActorMaterializer()
}
override def asyncInvoke(input: Shipment, resultFuture:
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
// PPLogger.getActivityLogger.info("###########INIT ------------------- ")
// implicit val system = ActorSystem("my-system")
// implicit val executionContext = system.dispatcher
// implicit val materializer: ActorMaterializer = ActorMaterializer()
// PPLogger.getActivityLogger.info("###########DONE ------------------- ")
val resultFutureRequested: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}
}
override def timeout(input: Shipment, resultFuture:
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function
call has timed out."))))
}
}
```
And its run ok. The log was print only one.
I still asking about this because I haven’t understand the term `That way you
only create the `ActorMaterialier` on the `TaskManager` where the operator is
executed and solve the problem of serializability `. I though for all the code
executed inside TaskManger?
Thanks for being patient with me, till here
Andy,
> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[email protected]> wrote:
>
> Hi Andy,
>
> without being an expert of Akka's http client, I think you should not create
> a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would
> recommend you instead is to implement a `RichAsyncFunction` with a transient
> field for `ActorMaterializer` which you initialize in the
> `RichAsyncFunction#open` method. That way you only create the
> `ActorMaterialier` on the `TaskManager` where the operator is executed and
> solve the problem of serializability and you make it much more efficient
> because you don't create a new `ActorSystem` for every request.
>
> Cheers,
> Till
>
> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[email protected]
> <mailto:[email protected]>> wrote:
> Hi guys,
>
> I’m try to decide which http client to go with Flink, currently I tested with
> scalaj and akka http client and both work ok with our current dev environment.
> For scalaj its is pretty straight forward since its is just calling an http
> request with its timeout.
>
> For akka http client its a bit more complicated (I’m new to scala and all),
> so I’m asking if am I doing it right by create a AsyncFunction like this
> ```
> class AsyncHttpClient( args: Array[String] = Array()) extends
> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>
> override def asyncInvoke(input: Shipment, resultFuture:
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>
> PPLogger.getActivityLogger.info("###########INIT ------------------- ")
> implicit val system = ActorSystem("my-system")
> implicit val executionContext = system.dispatcher
> implicit val materializer: ActorMaterializer = ActorMaterializer()
> val resultFutureRequested: Future[HttpResponse] =
> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json
> <https://httpbin.org/json>"))
> PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>
>
> resultFutureRequested.onComplete {
> case Success(res) => {
> resultFuture.complete(Iterable(Right(res.entity)))
> }
> case Failure(x) => {
> resultFuture.complete(Iterable(Left(x)))
> }
> }
>
> }
>
> override def timeout(input: Shipment, resultFuture:
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
> resultFuture.complete(Iterable(Left(new TimeoutException("Async function
> call has timed out."))))
> }
> }
> ```
> I notice that I have to implicit create a bunch of variable inside the
> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
> overhead. I did try to init them in the constructor of the class but the
> compiler just throw a bunch of Not implemented Serializer error.
>
> My lib:
> "com.typesafe.akka" %% "akka-http" % "10.1.8",
> "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>
> My flink:
> scala 2.12
> flink 1.70
>
>
>
> Any reply are appreciated!
>
> Thanks a bunch
>
> Andy,
>
>
>