Thanks for answering.

Not sure I understood the hack suggestion. If I copy SequenceNumber over to
my job, how the original Flink Kinesis lib will use that class? It's fixed
on a specific package (in this case
org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to
somehow hack the JAR itself and replace the class with an annotated class?

About the backpressure - I eliminated almost everything by now, so I don't
know what it could be. I've ran out of ideas so I'm starting to look into
serialization. The job is very, very simple. No algorithms. Most operations
are just list/set concatenations, and still getting backpressure, no matter
how big a cluster I use. I know where the backpressure is, I also started
profiling and there's not a single function which is slow. GC is also
looking good, no long pauses.

On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hey Ori,
>
> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. In
> the current state one can not use kinesis consumer with the
> pipeline.generic-types=false. The problem is because we use the
> TypeInformation.of(SequenceNumber.class) method, which will in this case
> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
> uses KryoSerializer. That is way it does not help to register a Kryo
> serializer, it is still a generic type).
>
> A dirty hack for you to try, could be to copy over the SequenceNumber over
> to your job and annotate it with TypeInfo where you provide a factory that
> would create something other than GenericTypeInfo (you could even use a
> copy of GenericTypeInfo, but with a removed check for the
> pipeline.generic-types flag). I know it is a really dirty hack.
>
> Ad. 2 Unfortunately I can't think of a better way.
>
> I have created FLINK-24549 to track the kinesis issue.[1]
>
> On the backpressure note, are you sure the issue is in the serialization?
> Have you tried identifying the slow task first?[2]
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-24549
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
> On 14/10/2021 12:41, Ori Popowski wrote:
>
> I'd appreciate if someone could advice on this issue.
>
> Thanks
>
> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com> wrote:
>
>> Hi,
>>
>> I have a large backpressure in a somewhat simple Flink application in
>> Scala. Using Flink version 1.12.1.
>>
>> To find the source of the problem, I want to eliminate all classes with
>> generic serialization, so I set
>> pipeline.generic-types=false
>>
>> in order to spot those classes and write a serializer for them.
>>
>> However, for some reason, I get the stracktrace attached below.
>>
>>    1. It looks suspicious that one of Flink's own classes doesn't have a
>>    serializer and should fallback to generic serialization. Is this a bug?
>>    2. I want to get a list of all classes which fallback to generic
>>    serialization. How can I do it other than setting
>>    pipeline.generic-types=false and eliminating those classes one by one?
>>    3. I defined a custom Kryo serializer for this class using both
>>    addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) and
>>    I still get the same error message. How can I provide Flink with custom
>>    serialization so it stops complaining about this?
>>
>>
>>
>> java.lang.UnsupportedOperationException: Generic types have been disabled
>> in the ExecutionConfig and type
>> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
>> treated as a generic type.
>> at
>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>> at
>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
>> at
>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
>> at
>> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
>> at
>> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
>> at
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>

Reply via email to