GitHub user Susmit07 closed a discussion: Issue while reading hdfs file post
Kerberos Authn
Hi Developers
We are facing an issue when using Apache Pekko connectors wrapped within a
UserGroupInformation.doAs block for Kerberos authentication. The same HDFS
operation works fine in a single-threaded Scala program without Pekko
connectors, but fails when run in Pekko streams. Are there any known
limitations or configurations needed for Pekko connectors when dealing with
Kerberos-authenticated HDFS access or concurrent UGI contexts?
```
def main(args: Array[String]): Unit = {
val hdfsPath = "hdfs://<dir=path>"
println("Path>>" + hdfsPath)
val hdfsConf = KerberosHadoopAuthenticator.hadoopConfig
val ugi: UserGroupInformation =
KerberosHadoopAuthenticator.getAuthenticatedUGI(None, hdfsConf)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
println("Inside run >>")
val fs = FileSystem.get(hdfsConf)
println("fs initialized >>")
val files = fs.listStatus(new Path(hdfsPath))
files.foreach(fileStatus => println(fileStatus.getPath.toString))
fs.close()
}
})
}
```
The above code runs fine
```
Inside run >>
[2024-10-07 19:58:41,503] [WARN]
[org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory] [] [main] - The
short-circuit local reads feature cannot be used because libhadoop cannot be
loaded. MDC: {}
fs initialized >>
hdfs://nnproxies/user/svc-segscores-dev/cef-nxt/sample.parquet
```
Now when accessing using pekko streams
```
override def listFiles(sourceDirectory: String): Future[List[String]] = {
ugi.doAs(new PrivilegedExceptionAction[Future[List[String]]] {
override def run(): Future[List[String]] =
listParquetFilesInDirectory(sourceDirectory)
})
}
```
```
private def listParquetFilesInDirectory(directory: String):
Future[List[String]] = Future {
try {
val status = fs.listStatus(new HadoopPath(directory)).toList
status.flatMap { fileStatus =>
if (fileStatus.isDirectory) {
listParquetFilesInDirectory(fileStatus.getPath.toString).value.get.get
} else if (fileStatus.isFile &&
fileStatus.getPath.toString.toLowerCase.endsWith(".parquet")) {
Some(fileStatus.getPath.toString)
} else {
None
}
}
} catch {
case _: FileNotFoundException =>
logger.error(s"Directory not found: $directory")
List.empty[String]
case ex: Exception =>
logger.error(s"Error accessing directory: ${ex.getMessage}")
List.empty[String]
}
}
```
The above function fails with below error:
```
[2024-10-07 17:22:02,403] [WARN] [org.apache.hadoop.ipc.Client] []
[cef-datasync-downloader-pekko.actor.default-dispatcher-21] - Exception
encountered while connecting to the server :
org.apache.hadoop.security.AccessControlException: Client cannot authenticate
via:[TOKEN, KERBEROS] MDC: {}
[2024-10-07 17:22:02,418] [WARN] [org.apache.hadoop.ipc.Client] []
[cef-datasync-downloader-pekko.actor.default-dispatcher-21] - Exception
encountered while connecting to the server :
org.apache.hadoop.security.AccessControlException: Client cannot authenticate
via:[TOKEN, KERBEROS] MDC: {}
[2024-10-07 17:22:02,418] [INFO]
[org.apache.hadoop.io.retry.RetryInvocationHandler] []
[cef-datasync-downloader-pekko.actor.default-dispatcher-21] -
java.io.IOException: DestHost:destPort hdd0935.global.tesco.org:8890 ,
LocalHost:localPort cef-datasync-downloader-575b5d755c-57mjc/10.42.2.113:0.
Failed on local exception: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot authenticate
via:[TOKEN, KERBEROS], while invoking
ClientNamenodeProtocolTranslatorPB.getListing over
hdd0935.global.tesco.org/172.17.121.235:8890 after 1 failover attempts. Trying
to failover after sleeping for 650ms. MDC: {}
```
Requesting for guidance here - @pjfanning
GitHub link: https://github.com/apache/pekko-connectors/discussions/858
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]