Then my statement must be wrong. Let me double check this. Yesterday when
checking the usage of the objectReuse field, I could only see it in the
batch operators. I'll get back to you.

Cheers,
Till

On Wed, Feb 19, 2020, 07:05 Jin Yi <eleanore....@gmail.com> wrote:

> Hi Till,
> I just read your comment:
> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
> only affects the DataSet API. DataStream programs will always do defensive
> copies. There is a FLIP to improve this behaviour [1].
>
> I have an application that is written in apache beam, but the runner is
> flink, in the configuration of the pipeline, it is in streaming mode, and I
> see performance difference between enable/disable ObjectReuse, also when
> running in debugging mode, I noticed that with objectReuse set to true,
> there is no serialization/deserialization happening between operators,
> however, when set to false, in between each operator, the serialization and
> deserialization is happening. So do you have any idea why this is happening?
>
> MyOptions options = PipelineOptionsFactory.as(MyOptions.class);
>
> options.setStreaming(true);
>
> options.setObjectReuse(true);
>
>
> Thanks a lot!
>
> Eleanore
>
>
> On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Theo,
>>
>> the KafkaDeserializationSchema does not allow to return asynchronous
>> results. Hence, Flink will always wait until
>> KafkaDeserializationSchema.deserialize returns the parsed value.
>> Consequently, the only way I can think of to offload the complex parsing
>> logic would be to do it in a downstream operator where you could use
>> AsyncI/O to run the parsing logic in a thread pool, for example.
>>
>> Alternatively, you could think about a simple program which transforms
>> your input events into another format where it is easier to extract the
>> timestamp from. This comes, however, at the cost of another Kafka topic.
>>
>> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
>> only affects the DataSet API. DataStream programs will always do defensive
>> copies. There is a FLIP to improve this behaviour [1].
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
>> theo.diefent...@scoop-software.de> wrote:
>>
>>> Hi,
>>>
>>> As for most pipelines, our flink pipeline start with parsing source
>>> kafka events into POJOs. We perform this step within a
>>> KafkaDeserizationSchema so that we properly extract the event itme
>>> timestamp for the downstream Timestamp-Assigner.
>>>
>>> Now it turned out that parsing is currently the most CPU intensive task
>>> in our pipeline and thus CPU bounds the number of elements we can ingest
>>> per second. Further splitting up the partitions will be hard as we need to
>>> maintain the exact order of events per partition and would also required
>>> quite some architectural changes for producers and the flink job.
>>>
>>> Now I had the idea to put the parsing task into ordered Async-IO. But
>>> AsyncIO can only be plugged in into an existing Stream, not into the
>>> deserialization schema, as far as I see. So the best idea I currently have
>>> is to keep parsing in the DeserializationSchema as minimal as possible to
>>> extract the Event timestamp and do the full parsing downstream in Async IO.
>>> This however, seems to be a bit tedious, especially as we have to deal with
>>> multiple input formats and would need to develop two parsers for the heavy
>>> load once: a timestamp only and a full parser.
>>>
>>> Do you know if it is somehow possible to parallelize / async IO the
>>> parsing within the KafkaDeserializationSchema? I don't have state access in
>>> there and I don't have a "collector" object in there so that one element as
>>> input needs to produce exactly one output element.
>>>
>>> Another question: My parsing produces Java POJO objects via "new", which
>>> are sent downstream (reusePOJO setting set) and finally will be garbage
>>> collected once they reached the sink. Is there some mechanism in Flink so
>>> that I could reuse "old" sinked POJOs in the source? All tasks are chained
>>> so that theoretically, that could be possible?
>>>
>>> Best regards
>>> Theo
>>>
>>

Reply via email to