dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-724725724


   Hi @vvcephei,
   
   Here is the fix. The reason for the broken integration tests was: 
`KafkaStreams#cleanUp` can be called regardless of the `StreamThread`s are 
terminated.
   
   That is, if `KafkaStreams#cleanUp` is called when the `KafkaStreams` is 
still running, the `{state-store-directory}/{application-id}` directory is 
deleted and  `StreamThread` crashes with the exception from 
`StateDirectory#directoryForTask` - since it fails to create the task state 
store directory, i.e., `{state-store-directory}/{application-id}/{task-id}`.
   
   Moreover, `StateDirectory#directoryForTask` method has two additional 
vulnerabilities:
   
   1. When it creates the task state store directory, it does not create its 
parent directories automatically - if there is not 
`{state-store-directory}/{application-id}`, `taskDir.mkdir()` returns false and 
it throws an exception. For this behavior breaks the integration tests, I 
modified `taskDir.mkdir()` to `taskDir.mkdirs()` to create 
`{state-store-directory}/{application-id}` automatically. 
   2. This method only checks whether there is a `File` at 
`{state-store-directory}/{application-id}/{task-id}`, regardless of it is 
actually a directory or not. I added an additional check condition for this 
case and `StateDirectoryTest#shouldThrowProcessorStateException` is updated 
accordingly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to