Then my statement must be wrong. Let me double check this. Yesterday when checking the usage of the objectReuse field, I could only see it in the batch operators. I'll get back to you.
Cheers, Till On Wed, Feb 19, 2020, 07:05 Jin Yi <eleanore....@gmail.com> wrote: > Hi Till, > I just read your comment: > Currently, enabling object reuse via ExecutionConfig.enableObjectReuse() > only affects the DataSet API. DataStream programs will always do defensive > copies. There is a FLIP to improve this behaviour [1]. > > I have an application that is written in apache beam, but the runner is > flink, in the configuration of the pipeline, it is in streaming mode, and I > see performance difference between enable/disable ObjectReuse, also when > running in debugging mode, I noticed that with objectReuse set to true, > there is no serialization/deserialization happening between operators, > however, when set to false, in between each operator, the serialization and > deserialization is happening. So do you have any idea why this is happening? > > MyOptions options = PipelineOptionsFactory.as(MyOptions.class); > > options.setStreaming(true); > > options.setObjectReuse(true); > > > Thanks a lot! > > Eleanore > > > On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Theo, >> >> the KafkaDeserializationSchema does not allow to return asynchronous >> results. Hence, Flink will always wait until >> KafkaDeserializationSchema.deserialize returns the parsed value. >> Consequently, the only way I can think of to offload the complex parsing >> logic would be to do it in a downstream operator where you could use >> AsyncI/O to run the parsing logic in a thread pool, for example. >> >> Alternatively, you could think about a simple program which transforms >> your input events into another format where it is easier to extract the >> timestamp from. This comes, however, at the cost of another Kafka topic. >> >> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse() >> only affects the DataSet API. DataStream programs will always do defensive >> copies. There is a FLIP to improve this behaviour [1]. >> >> [1] >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982 >> >> Cheers, >> Till >> >> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal < >> theo.diefent...@scoop-software.de> wrote: >> >>> Hi, >>> >>> As for most pipelines, our flink pipeline start with parsing source >>> kafka events into POJOs. We perform this step within a >>> KafkaDeserizationSchema so that we properly extract the event itme >>> timestamp for the downstream Timestamp-Assigner. >>> >>> Now it turned out that parsing is currently the most CPU intensive task >>> in our pipeline and thus CPU bounds the number of elements we can ingest >>> per second. Further splitting up the partitions will be hard as we need to >>> maintain the exact order of events per partition and would also required >>> quite some architectural changes for producers and the flink job. >>> >>> Now I had the idea to put the parsing task into ordered Async-IO. But >>> AsyncIO can only be plugged in into an existing Stream, not into the >>> deserialization schema, as far as I see. So the best idea I currently have >>> is to keep parsing in the DeserializationSchema as minimal as possible to >>> extract the Event timestamp and do the full parsing downstream in Async IO. >>> This however, seems to be a bit tedious, especially as we have to deal with >>> multiple input formats and would need to develop two parsers for the heavy >>> load once: a timestamp only and a full parser. >>> >>> Do you know if it is somehow possible to parallelize / async IO the >>> parsing within the KafkaDeserializationSchema? I don't have state access in >>> there and I don't have a "collector" object in there so that one element as >>> input needs to produce exactly one output element. >>> >>> Another question: My parsing produces Java POJO objects via "new", which >>> are sent downstream (reusePOJO setting set) and finally will be garbage >>> collected once they reached the sink. Is there some mechanism in Flink so >>> that I could reuse "old" sinked POJOs in the source? All tasks are chained >>> so that theoretically, that could be possible? >>> >>> Best regards >>> Theo >>> >>