Hey Everyone, We have a use case where we want to extract a response out of a AsyncSink Operator(HTTP in our case) and perform more transformation on top of it.
We implemented a HttpSink by following this blog https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/ . Since By design sink operators won't return back any response stream, we have built our own custom writerOperator(HttpSinkWriteOperator) by extending AbstractStreamOperator and emitting the HTTP response into the output stream. The HTTP operator works fine when used as a sink but when we try to extract response from it and write it to S3 using(stream.sinkTo(S3Sink)) we are seeing the job checkpoints are taking too long and eventually fail after(5min timeout configured). We are trying to use AsyncSinkBase because of it's built-in capability of *adaptive rate limiting* and retries as opposed to AsyncRichFunction. Is it fundamentally the correct thing to do? Is it possible to extract the response out of an Operator implemented using AsyncSinkBase?