Thanks for answering. Not sure I understood the hack suggestion. If I copy SequenceNumber over to my job, how the original Flink Kinesis lib will use that class? It's fixed on a specific package (in this case org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to somehow hack the JAR itself and replace the class with an annotated class?
About the backpressure - I eliminated almost everything by now, so I don't know what it could be. I've ran out of ideas so I'm starting to look into serialization. The job is very, very simple. No algorithms. Most operations are just list/set concatenations, and still getting backpressure, no matter how big a cluster I use. I know where the backpressure is, I also started profiling and there's not a single function which is slow. GC is also looking good, no long pauses. On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hey Ori, > > As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. In > the current state one can not use kinesis consumer with the > pipeline.generic-types=false. The problem is because we use the > TypeInformation.of(SequenceNumber.class) method, which will in this case > always fallback to GenericTypeInfo (The GenericTypeInfo is the one that > uses KryoSerializer. That is way it does not help to register a Kryo > serializer, it is still a generic type). > > A dirty hack for you to try, could be to copy over the SequenceNumber over > to your job and annotate it with TypeInfo where you provide a factory that > would create something other than GenericTypeInfo (you could even use a > copy of GenericTypeInfo, but with a removed check for the > pipeline.generic-types flag). I know it is a really dirty hack. > > Ad. 2 Unfortunately I can't think of a better way. > > I have created FLINK-24549 to track the kinesis issue.[1] > > On the backpressure note, are you sure the issue is in the serialization? > Have you tried identifying the slow task first?[2] > > Best, > > Dawid > > [1] https://issues.apache.org/jira/browse/FLINK-24549 > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html > On 14/10/2021 12:41, Ori Popowski wrote: > > I'd appreciate if someone could advice on this issue. > > Thanks > > On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski <ori....@gmail.com> wrote: > >> Hi, >> >> I have a large backpressure in a somewhat simple Flink application in >> Scala. Using Flink version 1.12.1. >> >> To find the source of the problem, I want to eliminate all classes with >> generic serialization, so I set >> pipeline.generic-types=false >> >> in order to spot those classes and write a serializer for them. >> >> However, for some reason, I get the stracktrace attached below. >> >> 1. It looks suspicious that one of Flink's own classes doesn't have a >> serializer and should fallback to generic serialization. Is this a bug? >> 2. I want to get a list of all classes which fallback to generic >> serialization. How can I do it other than setting >> pipeline.generic-types=false and eliminating those classes one by one? >> 3. I defined a custom Kryo serializer for this class using both >> addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) and >> I still get the same error message. How can I provide Flink with custom >> serialization so it stops complaining about this? >> >> >> >> java.lang.UnsupportedOperationException: Generic types have been disabled >> in the ExecutionConfig and type >> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is >> treated as a generic type. >> at >> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) >> at >> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104) >> at >> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49) >> at >> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99) >> at >> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302) >> at >> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264) >> at >> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216) >> at >> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >> at java.lang.Thread.run(Thread.java:748) >> >> >>