Hello,

I have a few topics that I want to read from Kafka, which consist mainly on
a key value pair of: timestamp (key) and value (byte array).

The bite array doesn't really have a class to deserialize from, since the
Avro Record we have comes from a "SELECT * FROM..." that selects several
SQL tables and in each topic we have that table represented.

We're using a GenericRecord, and since we know the structure of the table
via the name of the topic we know the column names, like
this: genericRecord.get("COLUMN_NAME").toString()

Given this, we're now trying to read a Kafka topic using Flink, and we have
this:

The environment is the StreamExecutionEnvironment and the properties are
about the Kafka serialization and deserialization and Kafka and Zookeeper
IP addresses.

class...

DataStream<Object> messageStream = environment
.addSource(new FlinkKafkaConsumer010<>(baseTopic, new
MyDeserializationSchema(schema), properties));

messageStream.print();

try {
environment.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return false;
}
}

class MyDeserializationSchema<T> implements DeserializationSchema<T> {
private static final Logger log =
LoggerFactory.getLogger(MyDeserializationSchema.class);

private final Class<T> avrotype = (Class<T>)
org.apache.avro.generic.GenericRecord.class;
private final Schema schema;
public MyDeserializationSchema(Schema schema) {
this.schema = schema;
}

@Override
public T deserialize(byte[] arg0) throws IOException {
log.info("Starting deserialization");
GenericRecord genericRecord;
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs
.toBinary(schema);
log.info(recordInjection.toString());
genericRecord = recordInjection.invert(arg0).get();
log.info(genericRecord.toString());
return (T) genericRecord;
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avrotype);
}

}

Executing this on our server generates the following:

[2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer
(key.deserializer)
(org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09)

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The implementation of
the FlinkKafkaConsumer09 is not serializable. The object probably contains
or references non serializable fields.
        at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
        at
com.i2s.analytics.flink.executors.LKTreatyExecutor.execute(LKTreatyExecutor.java:153)
        at
com.i2s.analytics.flink.job.DependenciesConsumer.main(DependenciesConsumer.java:66)
Caused by: java.io.NotSerializableException:
org.apache.avro.Schema$RecordSchema
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
        at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
        ... 6 more


I can't understand why the logs refer to a  FlinkKafkaConsumer09 when we're
using the  FlinkKafkaConsumer010 version.
And also, how can we deserialize to a GenericRecord so we can access the
record fields like we're doing when we're just reading a Kafka topic
without Flink.


Thanks in advance for any help that is given to us.

Reply via email to