Hey Everyone,
We have a use case where we want to extract a response
out of a AsyncSink Operator(HTTP in our case) and perform more
transformation
on top of it.

We implemented a HttpSink by following this
blog https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/
.

Since By design sink operators won't return back any response stream, we
have built
our own custom writerOperator(HttpSinkWriteOperator) by extending
 AbstractStreamOperator
and emitting the HTTP response into the output stream.

The HTTP operator works fine when used as a sink but when we try to extract
response from it
and write it to S3 using(stream.sinkTo(S3Sink)) we are seeing the
job checkpoints are taking too long and eventually fail after(5min timeout
configured).

We are trying to use AsyncSinkBase because of it's built-in capability of
*adaptive rate limiting* and retries as opposed to AsyncRichFunction.

Is it fundamentally the correct thing to do? Is it possible to extract the
response
out of an Operator implemented using AsyncSinkBase?

Reply via email to