Shengkai Fang created FLINK-17940:
-------------------------------------

             Summary: It will throw NullPointerException when write data with 
Avro format using new property key in SQL-Client 
                 Key: FLINK-17940
                 URL: https://issues.apache.org/jira/browse/FLINK-17940
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka, Table SQL / Client
    Affects Versions: 1.11.0
         Environment: Docker Environment:

zookeeper:
 image: wurstmeister/zookeeper:3.4.6
 ports:
 - "2181:2181"
 kafka:
 image: wurstmeister/kafka:2.12-2.2.1
 ports:
 - "9092:9092"
 - "9094:9094"
 depends_on:
 - zookeeper
 environment:
 - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9094,OUTSIDE://localhost:9092
 - KAFKA_LISTENERS=INSIDE://:9094,OUTSIDE://:9092
 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
 - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
 - KAFKA_CREATE_TOPICS:"order_cnt:1:1,orders:1:1,currency:1:1"
 volumes:
 - /var/run/docker.sock:/var/run/docker.sock
            Reporter: Shengkai Fang


For the following job:
{noformat}
create table csv( 
    user_name VARCHAR, is_new BOOLEAN, content VARCHAR
) with ( 
    'connector' = 'filesystem', 
    'path' = '/Users/ohmeatball/Work/flink-sql-etl/data-  
generator/src/main/resources/user.csv', 
    'format' = 'csv');
---------------------------------------------------------
CREATE TABLE AvroTest ( 
    user_name VARCHAR, is_new BOOLEAN, content VARCHAR
) WITH (
    'connector' = 'kafka', 'topic' = 'avro_from_csv',       
'properties.zookeeper.connect' = 'localhost:2181', 
'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 
'testGroup3', 'scan.startup.mode' = 'earliest-offset', 'format' = 'avro');
---------------------------------------------------------
insert into AvroTest select user_name, is_new, content from csv;
{noformat}
The exception stack is following:

 
{code:java}
2020-05-26 19:51:22,212 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - FileSystemTableSource(user_name, is_new, content) -> Sink: 
Sink(table=[default_catalog.default_database.AvroTest], fields=[user_name, 
is_new, content]) (1/1) (283a383f3ed93b051f56d4b5aca7dfb9) switched from 
RUNNING to FAILED.java.lang.RuntimeException: Failed to serialize row.    at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:118)
 ~[flink-avro-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:63)
 ~[flink-avro-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:51)
 ~[flink-sql-connector-kafka_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:775)
 ~[flink-sql-connector-kafka_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
 ~[flink-sql-connector-kafka_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:86)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:352)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:318)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:222)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:285)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:205)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:196)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:553)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:720) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:545) 
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    at 
java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]Caused by: 
java.lang.NullPointerException    at 
org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:113)
 ~[flink-avro-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]    ... 26 more

{code}
Notice: Everything works fine with old property.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to