Thank you Arvid, I was going to suggest something like this also.
We use TestContainers and the docker images provided by ververica to do
exactly this in our team.

I am currently working on a small project on github to start sharing for
use cases like this.
The project will contain some example sources and example sinks together
with a generic Flink application.
I will follow up sometime during the weekend with a poc. It's super
straightforward to set-up and use.

To elaborate a bit more on Arvids suggestion:

   - Use TestContainers as a base to configure your integration test.
   - LocalStack <https://github.com/localstack/localstack> is a fully
   functional docker container that you can use to mock various AWS services.
   Since it's unclear what sink you're using i just want to throw this out
   there.
   - Set up two containers abstracting the job manager and task manager
   according to this
   
<https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html#flink-with-docker-compose>
   documentation. If you decide to go with the application cluster route then
   I suggest setting up the task manager and job manager as GenericContainers.
   The rationale is that if you do everything in docker-compose and use a
   DockerComposeContainer the application will start before you have a chance
   to mock the data in your source as the DockerComposeContainer is started
   immediately iirc (which may be problematic depending on the way you
   application is configured to read from Kafka).

In fact one of the major benefits is that you simply configure the source
and sink and run the application outside of docker (as a
LocalStreamEnvironment).
This enables you to set breakpoints where the application is throwing the
exception which is specially valuable in circumstances like this where the
stacktrace is not super descriptive.

Best,
Arian Rohani


Den tors 1 apr. 2021 kl 15:00 skrev Arvid Heise <ar...@apache.org>:

> Arian gave good pointers, but I'd go even further: you should have ITCases
> where you pretty much just execute a mini job with docker-based Kafka and
> run it automatically.
> I strongly recommend to check out testcontainers [1], it makes writing
> such a test a really smooth experience.
>
> [1] https://www.testcontainers.org/modules/kafka/
>
>
> On Wed, Mar 31, 2021 at 2:29 PM Arian Rohani <arianroh...@gmail.com>
> wrote:
>
>> The issue at hand is that the record contains an unmodifiable collection
>> which the kryo serialiser attempts to modify by first initialising the
>> object and then adding items to the collection (iirc).
>>
>> Caused by: java.lang.UnsupportedOperationException
>>>         at
>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>>
>>
>> Without knowing the specifics of what it is exactly you are trying to
>> deserialise I can only attempt to give a generic answer which is to try
>> something like:
>>
>>
>>> StreamExecutionEnvironment see =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> Class<?> unmodColl =
>>> Class.forName("java.util.Collections$UnmodifiableCollection");
>>> see.getConfig().addDefaultKryoSerializer(unmodColl,
>>> UnmodifiableCollectionsSerializer.class);
>>
>>
>> An even better approach is to set-up a local sandbox environment in
>> docker with Kafka and a sink of your choice and simply running the
>> application form the main method in debug mode and setting a breakpoint
>> right before it throws the exception.
>>
>> Kind regards,
>> Arian Rohani
>>
>>
>> Den ons 31 mars 2021 kl 13:27 skrev Matthias Pohl <matth...@ververica.com
>> >:
>>
>>> Hi Maminspapin,
>>> I haven't worked with Kafka/Flink, yet. But have you had a look at the
>>> docs about the DeserializationSchema [1]? It
>>> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
>>> you're looking for?
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>>>
>>> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote:
>>>
>>>> I tried this:
>>>>
>>>> 1. Schema (found in stackoverflow)
>>>>
>>>> class GenericRecordSchema implements
>>>> KafkaDeserializationSchema<GenericRecord> {
>>>>
>>>>     private String registryUrl;
>>>>     private transient KafkaAvroDeserializer deserializer;
>>>>
>>>>     public GenericRecordSchema(String registryUrl) {
>>>>         this.registryUrl = registryUrl;
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>>>         return false;
>>>>     }
>>>>
>>>>     @Override
>>>>     public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
>>>> consumerRecord) throws Exception {
>>>>         checkInitialized();
>>>>         return (GenericRecord)
>>>> deserializer.deserialize(consumerRecord.topic(),
>>>> consumerRecord.value());
>>>>     }
>>>>
>>>>     @Override
>>>>     public TypeInformation<GenericRecord> getProducedType() {
>>>>         return TypeExtractor.getForClass(GenericRecord.class);
>>>>     }
>>>>
>>>>     private void checkInitialized() {
>>>>         if (deserializer == null) {
>>>>             Map<String, Object> props = new HashMap<>();
>>>>
>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>> registryUrl);
>>>>
>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>> false);
>>>>             SchemaRegistryClient client =
>>>>                     new CachedSchemaRegistryClient(
>>>>                             registryUrl,
>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>             deserializer = new KafkaAvroDeserializer(client, props);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> 2. Consumer
>>>>
>>>> private static FlinkKafkaConsumer<GenericRecord> getConsumer(String
>>>> topic) {
>>>>
>>>>     return new FlinkKafkaConsumer<>(
>>>>             topic,
>>>>             new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>>>>             getConsumerProperties());
>>>> }
>>>>
>>>> But when I start the app, the following error is happen:
>>>>
>>>> com.esotericsoftware.kryo.KryoException:
>>>> java.lang.UnsupportedOperationException
>>>> Serialization trace:
>>>> reserved (org.apache.avro.Schema$Field)
>>>> fieldMap (org.apache.avro.Schema$RecordSchema)
>>>> schema (org.apache.avro.generic.GenericData$Record)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>>         at
>>>>
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>>         at
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>>>> Caused by: java.lang.UnsupportedOperationException
>>>>         at
>>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>         at
>>>>
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>         ... 26 more
>>>>
>>>>
>>>> Not solving with:
>>>> env.getConfig().disableForceKryo();
>>>> env.getConfig().enableForceAvro();
>>>>
>>>>
>>>> Any idea?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>>

Reply via email to