Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-09 Thread via GitHub


pvillard31 commented on PR #8495:
URL: https://github.com/apache/nifi/pull/8495#issuecomment-2044659154

   @mattyb149 - I merged this into main but forgot to amend the commit and add 
the magic words to close the PR, so closing manually. Also it does not apply 
cleanly to 1.x can you open a PR for the support branch? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-09 Thread via GitHub


pvillard31 closed pull request #8495: NIFI-12889: Retry Kerberos login on auth 
failure in HDFS processors
URL: https://github.com/apache/nifi/pull/8495


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-08 Thread via GitHub


Lehel44 commented on PR #8495:
URL: https://github.com/apache/nifi/pull/8495#issuecomment-2043896737

   @mattyb149 Thanks for making these improvements! LGTM+1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-05 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1554481562


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java:
##
@@ -108,6 +109,8 @@ protected void processBatchOfFiles(final List files, 
final ProcessContext
 if (!keepSourceFiles && !hdfs.delete(file, false)) {
 logger.warn("Unable to delete path " + file.toString() + " 
from HDFS.  Will likely be picked up over and over...");
 }
+} catch (final IOException e) {

Review Comment:
   I think in the default exception handling the 
   
   ```java
   session.rollback();
   context.yield();
   ```
   was removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-05 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1554480782


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List 
files, final ProcessContext
 
 for (final Path file : files) {
 
-ugi.doAs(new PrivilegedAction() {
-@Override
-public Object run() {
-FlowFile flowFile = session.create(parentFlowFile);
-try {
-final String originalFilename = file.getName();
-final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
-final Path newFile = new Path(outputDirPath, 
originalFilename);
-final boolean destinationExists = hdfs.exists(newFile);
-// If destination file already exists, resolve that
-// based on processor configuration
-if (destinationExists) {
-switch (processorConfig.getConflictResolution()) {
-case REPLACE_RESOLUTION:
-// Remove destination file (newFile) to 
replace
-if (hdfs.delete(newFile, false)) {
-getLogger().info("deleted {} in order 
to replace with the contents of {}",
-new Object[]{newFile, 
flowFile});
-}
-break;
-case IGNORE_RESOLUTION:
-session.transfer(flowFile, REL_SUCCESS);
-getLogger().info(
-"transferring {} to success 
because file with same name already exists",
-new Object[]{flowFile});
-return null;
-case FAIL_RESOLUTION:
-
session.transfer(session.penalize(flowFile), REL_FAILURE);
-getLogger().warn(
-"penalizing {} and routing to 
failure because file with same name already exists",
-new Object[]{flowFile});
-return null;
-default:
-break;
-}
+ugi.doAs((PrivilegedAction) () -> {

Review Comment:
   I think the patch didn't apply correctly. The NullPointerException still 
occurs. In the processBatchOfFiles method's  catch should look like this:
   
   ```java
   catch (final Throwable t) {
   final Optional causeOptional = 
findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
   if (causeOptional.isPresent()) {
   throw new UncheckedIOException(new 
IOException(causeOptional.get()));
   }
   getLogger().error("Failed to rename on HDFS due to {}", 
new Object[]{t});
   session.transfer(session.penalize(flowFile), 
REL_FAILURE);
   context.yield();
   }
   ```
   
   In case we roll back the session here in the for loop, the 
NullPointerException appears, that's why we handle the exception outside of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-05 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1554480782


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List 
files, final ProcessContext
 
 for (final Path file : files) {
 
-ugi.doAs(new PrivilegedAction() {
-@Override
-public Object run() {
-FlowFile flowFile = session.create(parentFlowFile);
-try {
-final String originalFilename = file.getName();
-final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
-final Path newFile = new Path(outputDirPath, 
originalFilename);
-final boolean destinationExists = hdfs.exists(newFile);
-// If destination file already exists, resolve that
-// based on processor configuration
-if (destinationExists) {
-switch (processorConfig.getConflictResolution()) {
-case REPLACE_RESOLUTION:
-// Remove destination file (newFile) to 
replace
-if (hdfs.delete(newFile, false)) {
-getLogger().info("deleted {} in order 
to replace with the contents of {}",
-new Object[]{newFile, 
flowFile});
-}
-break;
-case IGNORE_RESOLUTION:
-session.transfer(flowFile, REL_SUCCESS);
-getLogger().info(
-"transferring {} to success 
because file with same name already exists",
-new Object[]{flowFile});
-return null;
-case FAIL_RESOLUTION:
-
session.transfer(session.penalize(flowFile), REL_FAILURE);
-getLogger().warn(
-"penalizing {} and routing to 
failure because file with same name already exists",
-new Object[]{flowFile});
-return null;
-default:
-break;
-}
+ugi.doAs((PrivilegedAction) () -> {

Review Comment:
   I think the patch didn't apply correctly. The NullPointerException still 
occurs. In the processBatchOfFiles method's  catch should look like this:
   
   ```java
   catch (final Throwable t) {
   final Optional causeOptional = 
findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
   if (causeOptional.isPresent()) {
   throw new UncheckedIOException(new 
IOException(causeOptional.get()));
   }
   getLogger().error("Failed to rename on HDFS due to {}", 
new Object[]{t});
   session.transfer(session.penalize(flowFile), 
REL_FAILURE);
   context.yield();
   }
   ```



##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List 
files, final ProcessContext
 
 for (final Path file : files) {
 
-ugi.doAs(new PrivilegedAction() {
-@Override
-public Object run() {
-FlowFile flowFile = session.create(parentFlowFile);
-try {
-final String originalFilename = file.getName();
-final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
-final Path newFile = new Path(outputDirPath, 
originalFilename);
-final boolean destinationExists = hdfs.exists(newFile);
-// If destination file already exists, resolve that
-// based on processor configuration
-if (destinationExists) {
-switch (processorConfig.getConflictResolution()) {
-case REPLACE_RESOLUTION:
-// Remove destination file (newFile) to 
replace
-if (hdfs.delete(newFile, false)) {
-getLogger().info("deleted 

Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-05 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1554480782


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List 
files, final ProcessContext
 
 for (final Path file : files) {
 
-ugi.doAs(new PrivilegedAction() {
-@Override
-public Object run() {
-FlowFile flowFile = session.create(parentFlowFile);
-try {
-final String originalFilename = file.getName();
-final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
-final Path newFile = new Path(outputDirPath, 
originalFilename);
-final boolean destinationExists = hdfs.exists(newFile);
-// If destination file already exists, resolve that
-// based on processor configuration
-if (destinationExists) {
-switch (processorConfig.getConflictResolution()) {
-case REPLACE_RESOLUTION:
-// Remove destination file (newFile) to 
replace
-if (hdfs.delete(newFile, false)) {
-getLogger().info("deleted {} in order 
to replace with the contents of {}",
-new Object[]{newFile, 
flowFile});
-}
-break;
-case IGNORE_RESOLUTION:
-session.transfer(flowFile, REL_SUCCESS);
-getLogger().info(
-"transferring {} to success 
because file with same name already exists",
-new Object[]{flowFile});
-return null;
-case FAIL_RESOLUTION:
-
session.transfer(session.penalize(flowFile), REL_FAILURE);
-getLogger().warn(
-"penalizing {} and routing to 
failure because file with same name already exists",
-new Object[]{flowFile});
-return null;
-default:
-break;
-}
+ugi.doAs((PrivilegedAction) () -> {

Review Comment:
   I think the patch didn't apply correctly. The NullPointerException still 
occurs. In the processBatchOfFiles method's  catch should look like this:
   
   ```java
   catch (final Throwable t) {
   final Optional causeOptional = 
findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
   if (causeOptional.isPresent()) {
   throw new UncheckedIOException(new 
IOException(causeOptional.get()));
   }
   getLogger().error("Failed to rename on HDFS due to {}", 
new Object[]{t});
   session.transfer(session.penalize(flowFile), 
REL_FAILURE);
   context.yield();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-05 Thread via GitHub


tpalfy commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1554149017


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List 
files, final ProcessContext
 
 for (final Path file : files) {
 
-ugi.doAs(new PrivilegedAction() {
-@Override
-public Object run() {
-FlowFile flowFile = session.create(parentFlowFile);
-try {
-final String originalFilename = file.getName();
-final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
-final Path newFile = new Path(outputDirPath, 
originalFilename);
-final boolean destinationExists = hdfs.exists(newFile);
-// If destination file already exists, resolve that
-// based on processor configuration
-if (destinationExists) {
-switch (processorConfig.getConflictResolution()) {
-case REPLACE_RESOLUTION:
-// Remove destination file (newFile) to 
replace
-if (hdfs.delete(newFile, false)) {
-getLogger().info("deleted {} in order 
to replace with the contents of {}",
-new Object[]{newFile, 
flowFile});
-}
-break;
-case IGNORE_RESOLUTION:
-session.transfer(flowFile, REL_SUCCESS);
-getLogger().info(
-"transferring {} to success 
because file with same name already exists",
-new Object[]{flowFile});
-return null;
-case FAIL_RESOLUTION:
-
session.transfer(session.penalize(flowFile), REL_FAILURE);
-getLogger().warn(
-"penalizing {} and routing to 
failure because file with same name already exists",
-new Object[]{flowFile});
-return null;
-default:
-break;
-}
+ugi.doAs((PrivilegedAction) () -> {

Review Comment:
   This way MoveHDFS is throwing a NullPointerException when the GSSException 
occurs because it tries the next file after it did a rollback.
   
   Here's a patch (against the current state of this PR) that would help:
   ```
   Subject: [PATCH] Changes base on code review comments
   ---
   Index: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===
   diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
   --- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 (revision 024860a2d8dccbaed8e98d280e1afd22dd0b7201)
   +++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 (date 1712341636424)
   @@ -47,9 +47,11 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
   +import org.ietf.jgss.GSSException;

import java.io.FileNotFoundException;
import java.io.IOException;
   +import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
   @@ -57,6 +59,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
   +import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
   @@ -325,16 +328,20 @@
queueLock.unlock();
}

   -processBatchOfFiles(files, context, session, flowFile);
   +try {
   +processBatchOfFiles(files, context, session, flowFile);

   +

Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-05 Thread via GitHub


tpalfy commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1554121881


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java:
##
@@ -388,18 +388,18 @@ protected void processBatchOfFiles(final List 
files, final ProcessContext
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
 
 if (!keepSourceFiles && 
!getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> 
hdfs.delete(file, false))) {
-getLogger().warn("Could not remove {} from HDFS. Not 
ingesting this file ...",
-new Object[]{file});
+getLogger().warn("Could not remove {} from HDFS. Not 
ingesting this file ...", file);
 session.remove(flowFile);
 continue;
 }
 
 session.getProvenanceReporter().receive(flowFile, 
file.toString());
 session.transfer(flowFile, REL_SUCCESS);
-getLogger().info("retrieved {} from HDFS {} in {} milliseconds 
at a rate of {}",
-new Object[]{flowFile, file, millis, dataRate});
+getLogger().info("retrieved {} from HDFS {} in {} milliseconds 
at a rate of {}", flowFile, file, millis, dataRate);
+} catch (final IOException e) {

Review Comment:
   (GitHub doesn't allow me to suggest efficiently)
   
   This way we don't handle non-GSS IOExceptions properly.
   
   Instead of 
   ```java
   } catch (final IOException e) {
   handleAuthErrors(e, session, context);
   } catch (final Throwable t) {
   getLogger().error("Error retrieving file {} from HDFS due to 
{}", file, t);
   session.rollback();
   context.yield();
   ```
   we should have
   ```java
   } catch (final Throwable t) {
   if (!handleAuthErrors(t, session, context)) {
   getLogger().error("Error retrieving file {} from HDFS 
due to {}", file, t);
   session.rollback();
   context.yield();
   }
   ```



##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -352,95 +355,95 @@ protected void processBatchOfFiles(final List 
files, final ProcessContext
 
 for (final Path file : files) {
 
-ugi.doAs(new PrivilegedAction() {
-@Override
-public Object run() {
-FlowFile flowFile = session.create(parentFlowFile);
-try {
-final String originalFilename = file.getName();
-final Path outputDirPath = getNormalizedPath(context, 
OUTPUT_DIRECTORY, parentFlowFile);
-final Path newFile = new Path(outputDirPath, 
originalFilename);
-final boolean destinationExists = hdfs.exists(newFile);
-// If destination file already exists, resolve that
-// based on processor configuration
-if (destinationExists) {
-switch (processorConfig.getConflictResolution()) {
-case REPLACE_RESOLUTION:
-// Remove destination file (newFile) to 
replace
-if (hdfs.delete(newFile, false)) {
-getLogger().info("deleted {} in order 
to replace with the contents of {}",
-new Object[]{newFile, 
flowFile});
-}
-break;
-case IGNORE_RESOLUTION:
-session.transfer(flowFile, REL_SUCCESS);
-getLogger().info(
-"transferring {} to success 
because file with same name already exists",
-new Object[]{flowFile});
-return null;
-case FAIL_RESOLUTION:
-
session.transfer(session.penalize(flowFile), REL_FAILURE);
-getLogger().warn(
-"penalizing {} and routing to 
failure because file with same name already exists",
-new Object[]{flowFile});
-return null;
-default:
-break;
-}
+ugi.doAs((PrivilegedAction) () -> {

Review Comment:
   This way 

Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-04 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1552005815


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java:
##
@@ -177,16 +177,20 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
 flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
 
session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
qualifiedPath.toString());
 } catch (IOException ioe) {
-// One possible scenario is that the IOException 
is permissions based, however it would be impractical to check every possible
-// external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
-getLogger().warn("Failed to delete file or 
directory", ioe);
-
-Map attributes = 
Maps.newHashMapWithExpectedSize(1);
-// The error message is helpful in understanding 
at a flowfile level what caused the IOException (which ACL is denying the 
operation, e.g.)
-attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
-
-
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
-failedPath++;
+if (handleAuthErrors(ioe, session, context)) {
+return null;
+} else {
+// One possible scenario is that the 
IOException is permissions based, however it would be impractical to check 
every possible
+// external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
+getLogger().warn("Failed to delete file or 
directory", ioe);
+
+Map attributes = 
Maps.newHashMapWithExpectedSize(1);
+// The error message is helpful in 
understanding at a flowfile level what caused the IOException (which ACL is 
denying the operation, e.g.)
+attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
+
+
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
+failedPath++;
+}
 }
 }
 }

Review Comment:
   Error handling of GSSException should be added to line 213 outer catch as 
well, because hdfs is called outside of the inner catch block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-04-04 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1552005815


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java:
##
@@ -177,16 +177,20 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
 flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
 
session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
qualifiedPath.toString());
 } catch (IOException ioe) {
-// One possible scenario is that the IOException 
is permissions based, however it would be impractical to check every possible
-// external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
-getLogger().warn("Failed to delete file or 
directory", ioe);
-
-Map attributes = 
Maps.newHashMapWithExpectedSize(1);
-// The error message is helpful in understanding 
at a flowfile level what caused the IOException (which ACL is denying the 
operation, e.g.)
-attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
-
-
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
-failedPath++;
+if (handleAuthErrors(ioe, session, context)) {
+return null;
+} else {
+// One possible scenario is that the 
IOException is permissions based, however it would be impractical to check 
every possible
+// external HDFS authorization tool (Ranger, 
Sentry, etc). Local ACLs could be checked but the operation would be expensive.
+getLogger().warn("Failed to delete file or 
directory", ioe);
+
+Map attributes = 
Maps.newHashMapWithExpectedSize(1);
+// The error message is helpful in 
understanding at a flowfile level what caused the IOException (which ACL is 
denying the operation, e.g.)
+attributes.put(getAttributePrefix() + 
".error.message", ioe.getMessage());
+
+
session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), 
getFailureRelationship());
+failedPath++;
+}
 }
 }
 }

Review Comment:
   Error handling of GSSException should be added to line 213 outer catch as 
well, because the fileSystem.exists is called outside of the inner catch block.



##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -294,7 +302,7 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
 if (logEmptyListing.getAndDecrement() > 0) {
 getLogger().info(
 "Obtained file listing in {} milliseconds; listing 
had {} items, {} of which were new",
-new Object[]{millis, listedFiles.size(), 
newItems});
+millis, listedFiles.size(), newItems);

Review Comment:
   Could not comment on line 450: In processBachOfFiles line 450 the 
GSSException should be handled as well.



##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java:
##
@@ -254,8 +254,16 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
 if (!directoryExists) {
 throw new IOException("Input Directory or File does not exist 
in HDFS");
 }
+} catch (final IOException e) {

Review Comment:
   The IOException does not need to be handled differently, it can be handled 
in the Exception catch branch
   
   ```java 
   catch (Exception e) {
   if(!handleAuthErrors(e, session, context)) {
   getLogger().error("Failed to retrieve content from {} for {} 
due to {}; routing to failure", filenameValue, flowFile, e);
   flowFile = session.putAttribute(flowFile, 
"hdfs.failure.reason", e.getMessage());
   flowFile = session.penalize(flowFile);
   session.transfer(flowFile, REL_FAILURE);
   }
   return;
   }
   ```



##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ 

Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-26 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1538284672


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -465,10 +465,16 @@ public void process(InputStream in) throws IOException {
 } catch (final IOException e) {

Review Comment:
   Since we throw the GSSException wrapped in a ProcessException, I think we 
have to catch the ProcessException, then check if it's a GSSException. If so 
rollback, othwerwise throw it away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-25 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1538284672


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -465,10 +465,16 @@ public void process(InputStream in) throws IOException {
 } catch (final IOException e) {

Review Comment:
   Since we throw the GSSException wrapped in a ProcessException, I think we 
have to catch the ProcessException, then check if it's a GSSException. If so 
rollback, othwerwise throw it away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-21 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1534451779


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {
+// Catch GSSExceptions and reset the resources
+Optional causeOptional = 
findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
+if (causeOptional.isPresent()) {
+getLogger().warn("Error authenticating when 
performing file operation, resetting HDFS resources: {} ", 
e.getCause().getMessage());
+
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
+}

Review Comment:
   This change intended to throw the exception to the outer catch block that 
will log it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-21 Thread via GitHub


mattyb149 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1534228284


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {
+// Catch GSSExceptions and reset the resources
+Optional causeOptional = 
findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
+if (causeOptional.isPresent()) {
+getLogger().warn("Error authenticating when 
performing file operation, resetting HDFS resources: {} ", 
e.getCause().getMessage());
+
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
+}

Review Comment:
   This suggested change removes the error logging and the reset, is that 
intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-16 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1527388335


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {

Review Comment:
   Yeah I think it's fine. If executing the mentioned lines does not cause 
discrepancy, I prefer the simplicity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-15 Thread via GitHub


mattyb149 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1526776640


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {
+// Catch GSSExceptions and reset the resources
+Optional causeOptional = 
findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
+if (causeOptional.isPresent()) {
+getLogger().warn("Error authenticating when 
performing file operation, resetting HDFS resources: {} ", 
e.getCause().getMessage());

Review Comment:
   Yeah same thing as above, I can add the exception regardless (I think) and 
change the logging level to error if the user needs to be alerted immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-15 Thread via GitHub


mattyb149 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1526776640


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {
+// Catch GSSExceptions and reset the resources
+Optional causeOptional = 
findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
+if (causeOptional.isPresent()) {
+getLogger().warn("Error authenticating when 
performing file operation, resetting HDFS resources: {} ", 
e.getCause().getMessage());

Review Comment:
   Yeah same thing as above, I can add the error regardless (I think) and 
change it to error if the user needs to be alerted immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-15 Thread via GitHub


mattyb149 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1526775762


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {
+// Catch GSSExceptions and reset the resources
+Optional causeOptional = 
findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
+if (causeOptional.isPresent()) {
+getLogger().warn("Error authenticating when 
performing file operation, resetting HDFS resources: {} ", 
e.getCause().getMessage());

Review Comment:
   The intent with WARN was not to generate a bulletin by default but I'm fine 
either way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-15 Thread via GitHub


mattyb149 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1526688710


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {

Review Comment:
   Works for me, the key was to not have it blow up for a "simple" relogin 
error. I thought I'd found that depending on the config the first create() 
could cause the login error. If anything but a "Kerberos relogin error" occurs, 
I think we should rightfully propagate the exception, I can throw it as a 
ProcessException inside the read(). What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-13 Thread via GitHub


Lehel44 commented on code in PR #8495:
URL: https://github.com/apache/nifi/pull/8495#discussion_r1524238959


##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-}
+session.read(putFlowFile, in -> {
+OutputStream fos = null;
+Path createdFile = null;
+try {
+if (conflictResponse.equals(APPEND_RESOLUTION) && 
destinationExists) {
+fos = hdfs.append(copyFile, bufferSize);
+} else {
+final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
 
-fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
-null, null);
+if (shouldIgnoreLocality(context, session)) {
+
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
 }
 
-if (codec != null) {
-fos = codec.createOutputStream(fos);
+fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
+null, null);
+}
+
+if (codec != null) {
+fos = codec.createOutputStream(fos);
+}
+createdFile = actualCopyFile;
+BufferedInputStream bis = new 
BufferedInputStream(in);
+StreamUtils.copy(bis, fos);
+bis = null;
+fos.flush();
+} catch (IOException e) {
+// Catch GSSExceptions and reset the resources
+Optional causeOptional = 
findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == 
gsse.getMajor());
+if (causeOptional.isPresent()) {

Review Comment:
   Here we catch the IOException, but in case the underlying cause is not a 
GSSException with an error code NO_CRED, the exception wont be handled and will 
be absorbed by the catch. The IOException should be thrown in the else branch.



##
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java:
##
@@ -372,54 +370,56 @@ public Object run() {
 
 // Write FlowFile to temp file on HDFS
 final StopWatch stopWatch = new StopWatch(true);
-session.read(putFlowFile, new InputStreamCallback() {
-
-@Override
-public void process(InputStream in) throws IOException 
{
-OutputStream fos = null;
-Path createdFile = null;
-try {
-if (conflictResponse.equals(APPEND_RESOLUTION) 
&& destinationExists) {
-fos = hdfs.append(copyFile, bufferSize);
-} else {
-final EnumSet cflags = 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-if (shouldIgnoreLocality(context, 
session)) {
-

Re: [PR] NIFI-12889: Retry Kerberos login on auth failure in HDFS processors [nifi]

2024-03-13 Thread via GitHub


Lehel44 commented on PR #8495:
URL: https://github.com/apache/nifi/pull/8495#issuecomment-1995416957

   reviewing...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org