Hi Andy,

you can do some micro benchmarks where you instantiate your AsyncHttpClient
and call the invoke method. But better would be to benchmark it end-to-end
by running it on a cluster with a realistic workload which you also expect
to occur in production.

Cheers,
Till

On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <a...@parcelperform.com> wrote:

> Hi Till,
> Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that
> feature: https://issues.apache.org/jira/browse/FLINK-6756
> Meanwhile I can reimplement it in the copy-patse manner but I’m still
> curious if my AsyncHttpClient
>  work nicely or not, what would be the down side when you look at it.
> I understand the open/close method is will help in term of init/cleaning
> resource, but how can we benchmark the solution to make sure one is better
> than the other? What is the key to decide here or we have to try it in
> production first?
> Thank a lot, again
>
> Andy,
>
>
> On Apr 12, 2019, at 2:44 PM, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Andy,
>
> there is also a Scala version of the `RichAsyncFunction`.
>
> In Scala you have to specify a value for class members. This is different
> from Java.
>
> User code is first instantiated on the client where you create the job
> topology (basically where you call new RichAsyncHttpClient). The code is
> then serialized and shipped to the cluster where it is actually executed.
>
> Cheers,
> Till
>
> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <a...@parcelperform.com> wrote:
>
>> Hi Till,
>> Thanks for your reply, I manage do some experiments and has result as
>> some worked and some not. I hope you can give me a bit more insight:
>>
>> As your suggestion to impl a `RichAsyncFunction` with transient field,
>> like this and having error
>>
>> ```
>> Class 'RichAsyncHttpClient' must either be declared abstract or implement
>> abstract member 'executionContext: ExecutionContextExecutor' in
>> ‘com.parcelperform.util.RichAsyncHttpClient’
>> ```
>>
>> ```
>>
>> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
>> Either[Throwable, ResponseEntity]]{
>>
>>   PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>   @transient implicit var materializer: ActorMaterializer
>>   @transient implicit var system: ActorSystem
>>   @transient implicit var executionContext: ExecutionContextExecutor
>>
>>
>>   override def asyncInvoke(input: Shipment, resultFuture: 
>> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>
>>     val resultFutureRequested: Future[HttpResponse] = 
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>>
>>     resultFutureRequested.onComplete {
>>       case Success(res) => {
>>         resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
>>       }
>>       case Failure(x)   => {
>>         resultFuture.complete(Iterable(Either(x)).asJavaCollection)
>>       }
>>     }
>>   }
>>
>>   override def open(parameters: Configuration): Unit = {
>>     super.open(parameters)
>>     system = ActorSystem("my-system")
>>     executionContext = system.dispatcher
>>     materializer = ActorMaterializer()
>>   }
>> }
>>
>> ```
>>
>> Aslo the Usage of that class, I has error, I guess its because of
>> java/scala issue. In flink doc, for java code they use RichAsyncFunction
>> and for scala they use AsyncFunction:
>> ```
>> //    AsyncDataStream.unorderedWait(streamShipment, new
>> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type
>> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual:
>> RichAsyncHttpClient
>>
>> ```
>>
>> ###
>>
>> So I try to fix my current code again with transient field and move it
>> into constructor:
>> ```
>>
>>
>>
>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>
>>   @transient implicit lazy val system = {
>>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>     ActorSystem("my-system")
>>   }
>>   @transient implicit lazy val executionContext = {
>>     system.dispatcher
>>   }
>>   @transient implicit lazy val materializer: ActorMaterializer = {
>>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>     ActorMaterializer()
>>   }
>>
>>   override def asyncInvoke(input: Shipment, resultFuture: 
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>> //    PPLogger.getActivityLogger.info 
>> <http://pplogger.getactivitylogger.info/>("###########INIT 
>> ------------------- ")
>> //    implicit val system = ActorSystem("my-system")
>> //    implicit val executionContext = system.dispatcher
>> //    implicit val materializer: ActorMaterializer = ActorMaterializer()
>> //    PPLogger.getActivityLogger.info 
>> <http://pplogger.getactivitylogger.info/>("###########DONE 
>> ------------------- ")
>>
>>     val resultFutureRequested: Future[HttpResponse] = 
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>>     resultFutureRequested.onComplete {
>>       case Success(res) => {
>>         resultFuture.complete(Iterable(Right(res.entity)))
>>       }
>>       case Failure(x)   => {
>>         resultFuture.complete(Iterable(Left(x)))
>>       }
>>     }
>>
>>   }
>>
>>   override def timeout(input: Shipment, resultFuture: 
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>     resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
>> call has timed out."))))
>>   }
>> }
>>
>> ```
>> And its run ok. The log was print only one.
>>
>> I still asking about this because I haven’t understand the term `That way
>> you only create the `ActorMaterialier` on the `TaskManager` where the
>> operator is executed and solve the problem of serializability `. I though
>> for all the code executed inside TaskManger?
>>
>> Thanks for being patient with me, till here
>>
>> Andy,
>>
>>
>>
>>
>> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> Hi Andy,
>>
>> without being an expert of Akka's http client, I think you should not
>> create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`.
>> What I would recommend you instead is to implement a `RichAsyncFunction`
>> with a transient field for `ActorMaterializer` which you initialize in the
>> `RichAsyncFunction#open` method. That way you only create the
>> `ActorMaterialier` on the `TaskManager` where the operator is executed and
>> solve the problem of serializability and you make it much more efficient
>> because you don't create a new `ActorSystem` for every request.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com>
>> wrote:
>>
>>> Hi guys,
>>>
>>> I’m try to decide which http client to go with Flink, currently I tested
>>> with scalaj and akka http client and both work ok with our current dev
>>> environment.
>>> For scalaj its is pretty straight forward since its is just calling an
>>> http request with its timeout.
>>>
>>> For akka http client its a bit more complicated (I’m new to scala and
>>> all), so I’m asking if am I doing it right by create a AsyncFunction like
>>> this
>>> ```
>>>
>>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>>
>>>   override def asyncInvoke(input: Shipment, resultFuture: 
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>
>>>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>>     implicit val system = ActorSystem("my-system")
>>>     implicit val executionContext = system.dispatcher
>>>     implicit val materializer: ActorMaterializer = ActorMaterializer()
>>>     val resultFutureRequested: Future[HttpResponse] = 
>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>>>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>>
>>>
>>>     resultFutureRequested.onComplete {
>>>       case Success(res) => {
>>>         resultFuture.complete(Iterable(Right(res.entity)))
>>>       }
>>>       case Failure(x)   => {
>>>         resultFuture.complete(Iterable(Left(x)))
>>>       }
>>>     }
>>>
>>>   }
>>>
>>>   override def timeout(input: Shipment, resultFuture: 
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>     resultFuture.complete(Iterable(Left(new TimeoutException("Async 
>>> function call has timed out."))))
>>>   }
>>> }
>>>
>>> ```
>>> I notice that I have to implicit create a bunch of variable inside the
>>> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
>>> overhead. I did try to init them in the constructor of the class but the
>>> compiler just throw a bunch of Not implemented Serializer error.
>>>
>>> My lib:
>>>    "com.typesafe.akka" %% "akka-http" % "10.1.8",
>>>   "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>>>
>>> My flink:
>>> scala 2.12
>>> flink 1.70
>>>
>>>
>>>
>>> Any reply are appreciated!
>>>
>>> Thanks a bunch
>>>
>>> Andy,
>>>
>>>
>>>
>>>
>>
>

Reply via email to