????flinksql????CREATE TABLE kafkaInputTable ( 
   name String,
   address String
) WITH ( 
   'connector' = 'kafka', 
   'topic' = 'test',
   'properties.bootstrap.servers' = 
'node1:9092,node2:9092,node3:9092',
   'format' = 'avro'
   );
CREATE TABLE kafkaOutputTable ( 
   name String,
   address String 
) WITH ( 
   'connector' = 'kafka', 
   'topic' = 'test1',
   'properties.bootstrap.servers' = 
'node1:9092,node2:9092,node3:9092',
   'format' = 'csv'
   );
INSERT INTO kafkaOutputTable SELECT name, address FROM kafkaInputTable;



??????????test ????topic????avro????????????????????Failed ro deserialize Avro 
record......ArrayIndexOutOfBoundsException: 6
{
  "namespace": "cc.unmi.data",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "address", "type": ["string", "null"]}
  ]
}



????????????????????????????????avro????????????sink??????????????

回复