Elizabeth Bennett created KAFKA-6133:
Summary: NullPointerException in S3 Connector when using
rotate.interval.ms
Key: KAFKA-6133
URL: https://issues.apache.org/jira/browse/KAFKA-6133
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 0.11.0.0
Reporter: Elizabeth Bennett
I just tried out the new rotate.interval.ms feature in the S3 connector to do
time based flushing. I am getting this NPE on every event:
{{[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
at
io.confluent.connect.s3.TopicPartitionWriter.rotateOnTime(TopicPartitionWriter.java:288)
at
io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:234)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2017-10-20 23:21:35,233] ERROR Task is being killed and will not recover until
manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2017-10-20 23:21:35,233] ERROR Task foo-to-S3-0 threw an uncaught and
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to
unrecoverable exception.
at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)}}
I dug into the S3 connect code a bit and it looks like the
{{rotate.interval.ms}} feature only works if you are using the
TimeBasedPartitioner. It will get the TimestampExtractor class from the
TimeBasedPartitioner to determine the timestamp of the event, and will use this
for the time based flushing.
I'm using a custom partitioner, but I'd still really like to use the
{{rotate.interval.ms}} feature, using wall clock time to determine the flushing
behavior.
I'd be willing to work on fixing this issue, but I want to confirm it is
actually bug, and not that it was specifically designed to only work with the
TimeBasedPartitioner. Even if it is the later, it should probably not crash
with an NPE.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)