[ 
https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269087#comment-17269087
 ] 

kaushik srinivas commented on KAFKA-12164:
------------------------------------------

Adding few major concerns with regard to the feedback of re creating the 
corrupt directories upon restart,

function syncWithHive() (DataWriter.java) is called at every restart/boot up of 
the connector. And this is the function which does an initial audit of all the 
partition directories and tries to sync the hdfs folders with the hive 
partitions before proceeding further to consume records from kafka.

Below is the snippet for the same.

 
{code:java}
List<String> partitions = hiveMetaStore.listPartitions(hiveDatabase, 
topicTableMap.get(topic), (short) -1);
FileStatus[] statuses = FileUtils.getDirectories(storage, new Path(topicDir));
for (FileStatus status : statuses) {
  String location = status.getPath().toString();
  if (!partitions.contains(location)) {
    String partitionValue = getPartitionValue(location);
    hiveMetaStore.addPartition(hiveDatabase, topicTableMap.get(topic), 
partitionValue);
  }
{code}
Now going one step inside into function getDirectories > getDirectoriesImpl 
(from FileUtils).
here, those paths are returned as partition path if
a. the path is a directory
b. path does not contain nested directories (by way of checking no of non 
directory files is equal to no of (directory + non directory) files in the path.

If above conditions are met, then the path is added as partition path.

So in the erroneous case where the actual path is supposed to look like
/test1=0/test2=0/xxxxxxx.parquet
But instead due to a crash looks like below,
/test1=0/

In this case /test1=0 , satisfies the above a&b conditions and hence is 
returned as a new partition path to be updated to hive.
Doing this update to hive fails because the actual partition for hive is 
expected to be /test1=0/test2=0 and not /test1=0/

 

So this would mean, once there is a corrupt partition directory in hdfs, at 
every restart of the connector syncWithHive() call will keep throwing hive 
exceptions till the directory is corrected in the hdfs. This means that the 
stage of consuming the old (failed to commit) records again (even assuming its 
present in kafka after restart) would never be reached and connector remains in 
crashed state forever and requires a manual intervention of clean up activity.

-kaushik

 

> ssue when kafka connect worker pod restart, during creation of nested 
> partition directories in hdfs file system.
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: kaushik srinivas
>            Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the 
> same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested 
> directories derived from multiple fields in the record using a custom 
> partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the 
> directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: 
> [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation 
> (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation 
> of test2 in hdfs happens if there is a restart to the connect worker pod. 
> Then the hdfs file system will remain with partial folders created for 
> partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive 
> integration is enabled, hive metastore exceptions will occur since there is a 
> partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a 
> restart is triggered to kafka connect worker pod. This will result in some 
> partially completed states in the system and may cause issues for the 
> connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling 
> the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to