[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269064#comment-17269064
 ] 

kaushik srinivas edited comment on KAFKA-12164 at 1/21/21, 6:31 AM:
--------------------------------------------------------------------

For the below comments

"Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand 
that a nested directory can be constructed from a given path, even if some of 
the parents exist. Which makes me think that a restart would allow the creation 
of the full path as long as the partitioner is deterministic with respect to 
the record (i.e. for a given record it always creates the same path). Upon a 
restart the export of a record will be retried, so the partitioner could 
possibly reattempt the full creation of the path. But that's an early 
assumption from my side. "

 

Lets suppose partitioner derives the directory /test1/test2 for a given record. 
And the connector crashes during the directory creation and hdfs ends up with 
only /test1/ folder.

But there is no guarantee that the kafka record which was not committed fully 
by the connector would exist in kafka even after restart/recovery of the 
connector. So there could be scenarios where these partial directories remain 
partial forever and requires an additional manual intervention to clean it up 
or complete its creation in hdfs, without which we see issues in hive table 
partitions if enabled.


was (Author: kaushik srinivas):
For the below comments

"Now, by reading quickly the javadoc for FileSystem#mkdirs in HDFS I understand 
that a nested directory can be constructed from a given path, even if some of 
the parents exist. Which makes me think that a restart would allow the creation 
of the full path as long as the partitioner is deterministic with respect to 
the record (i.e. for a given record it always creates the same path). Upon a 
restart the export of a record will be retried, so the partitioner could 
possibly reattempt the full creation of the path. But that's an early 
assumption from my side. "

 

Lets suppose partitioner derives the directory /test1/test2 for a given record. 
And the connector crashed during the directory creation and hdfs ends up with 
only /test1/ folder.

But there is no guarantee that the kafka record which was not committed fully 
by the connector would exist in kafka even after restart/recovery of the 
connector. So there could be scenarios where these partial directories remain 
partial forever and requires an additional manual intervention to clean it up 
or complete its creation in hdfs, without which we see issues in hive table 
partitions if enabled.

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: kaushik srinivas
>            Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to