I'm trying to integrate with schemaRegistry and SparkStreaming. By the moment I want to use GenericRecords. It seems that my producer works and new schemas are published in _schemas topic. When I try to read with my Consumer, I'm not able to deserialize the data.
How could I say to Spark that I'm going to deserializer to GenericRecord? public class SparkStreamingSchemaRegister { public static void main(String[] args) throws InterruptedException { String topic = "avro_example_schemaRegistry"; final JavaStreamingContext jssc = new JavaStreamingContext(getSparkConf(), Durations.milliseconds(Constants.STREAM_BATCH_DURATION_MILLIS)); final JavaInputDStream<ConsumerRecord<byte[], *GenericRecord*>> rawStream = KafkaSource.getKafkaDirectStream(jssc); rawStream.foreachRDD(rdd -> { JavaRDD<Client> javaRddClient = rdd.map( kafkaRecord -> { GenericRecord record = kafkaRecord.value(); --> ERROR return CrmClient.getCrmClient(kafkaRecord.value()); }); CassandraJavaUtil .javaFunctions(javaRddClient) .writerBuilder("keyspace", "client", CassandraJavaUtil.mapToRow(CrmClient.class)) .withColumnSelector(CassandraJavaUtil.someColumns("id", "name", "lastname")) .saveToCassandra(); }); jssc.start(); jssc.awaitTermination(); jssc.close(); } private static class KafkaSource { public static JavaInputDStream<ConsumerRecord<byte[], *GenericRecord*>> getKafkaDirectStream(JavaStreamingContext jssc) { JavaInputDStream<ConsumerRecord<byte[], *GenericRecord*>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<byte[], *GenericRecord*>Subscribe(getKafkaTopic(), getKafkaConf())); return stream; } private static Map<String, Object> getKafkaConf() { Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_BOOTSTRAP_SERVERS.getValue(), Constants.KAFKA_BOOTSTRAP_SERVERS); kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_KEY_DESERIALIZER.getValue(), ByteArrayDeserializer.class); kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_GROUPID.getValue(), Constants.KAFKA_GROUP_ID); kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_ENABLE_AUTO_COMMIT.getValue(), false); kafkaParams.put(Constants.KAFKA_PROPERTIES.KAFKA_AUTO_OFFSET_RESET.getValue(), "earliest"); * kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class.getName());* kafkaParams.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false"); kafkaParams.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG," http://localhost:8081"); return kafkaParams; } } }