The 010 consumer extends 09, so I'd guess whatever code is reporting sees
the FlinkKafkaConsumer010 as its superclass.

I've seen this error a bunch, and it's because MyDeserializationSchema
isn't serializable, or likely one of its fields is not serializable, or one
of the fields of its fields - you understand, everything in the object
graph has to be serializable.

Probably the easiest way to understand that is to write a unit test to make
sure that MyDeserializationSchema is serializable, essentially a test to
make sure ObjectOutputStream.writeObject will work. That's a pretty useful
test because you find out if a change to your MyDeserializationSchema will
break the runtime during the test phase instead of waiting until you get to
the deploy/run stage.


On Fri, Mar 2, 2018 at 10:42 AM, Filipe Couto <filipe.cout...@gmail.com>
wrote:

> Hello,
>
> I have a few topics that I want to read from Kafka, which consist mainly
> on a key value pair of: timestamp (key) and value (byte array).
>
> The bite array doesn't really have a class to deserialize from, since the
> Avro Record we have comes from a "SELECT * FROM..." that selects several
> SQL tables and in each topic we have that table represented.
>
> We're using a GenericRecord, and since we know the structure of the table
> via the name of the topic we know the column names, like
> this: genericRecord.get("COLUMN_NAME").toString()
>
> Given this, we're now trying to read a Kafka topic using Flink, and we
> have this:
>
> The environment is the StreamExecutionEnvironment and the properties are
> about the Kafka serialization and deserialization and Kafka and Zookeeper
> IP addresses.
>
> class...
>
> DataStream<Object> messageStream = environment
> .addSource(new FlinkKafkaConsumer010<>(baseTopic, new
> MyDeserializationSchema(schema), properties));
>
> messageStream.print();
>
> try {
> environment.execute();
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
>
> return false;
> }
> }
>
> class MyDeserializationSchema<T> implements DeserializationSchema<T> {
> private static final Logger log = LoggerFactory.getLogger(
> MyDeserializationSchema.class);
>
> private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.
> GenericRecord.class;
> private final Schema schema;
> public MyDeserializationSchema(Schema schema) {
> this.schema = schema;
> }
>
> @Override
> public T deserialize(byte[] arg0) throws IOException {
> log.info("Starting deserialization");
> GenericRecord genericRecord;
> Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs
> .toBinary(schema);
> log.info(recordInjection.toString());
> genericRecord = recordInjection.invert(arg0).get();
> log.info(genericRecord.toString());
> return (T) genericRecord;
> }
>
> @Override
> public boolean isEndOfStream(T nextElement) {
> return false;
> }
>
> @Override
> public TypeInformation<T> getProducedType() {
> return TypeExtractor.getForClass(avrotype);
> }
>
> }
>
> Executing this on our server generates the following:
>
> [2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer
> (key.deserializer) (org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaConsumer09)
>
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException:
> The implementation of the FlinkKafkaConsumer09 is not serializable. The
> object probably contains or references non serializable fields.
>         at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:100)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
>         at org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
>         at com.i2s.analytics.flink.executors.LKTreatyExecutor.
> execute(LKTreatyExecutor.java:153)
>         at com.i2s.analytics.flink.job.DependenciesConsumer.main(
> DependenciesConsumer.java:66)
> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$
> RecordSchema
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:315)
>         at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:81)
>         ... 6 more
>
>
> I can't understand why the logs refer to a  FlinkKafkaConsumer09 when
> we're using the  FlinkKafkaConsumer010 version.
> And also, how can we deserialize to a GenericRecord so we can access the
> record fields like we're doing when we're just reading a Kafka topic
> without Flink.
>
>
> Thanks in advance for any help that is given to us.
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London    <https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>   <https://community.sovrn.com/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.

Reply via email to