[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...

2016-05-16 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/439#discussion_r63429069
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
@@ -2308,18 +2308,50 @@ public void exportTo(final FlowFile source, final 
Path destination, final boolea
 public void exportTo(final FlowFile source, final OutputStream 
destination) {
 validateRecordState(source);
 final StandardRepositoryRecord record = records.get(source);
+
+if(record.getCurrentClaim() == null) {
+return;
+}
+
 try {
-if (record.getCurrentClaim() == null) {
-return;
+ensureNotAppending(record.getCurrentClaim());
+} catch (final IOException e) {
+throw new FlowFileAccessException("Failed to access 
ContentClaim for " + source.toString(), e);
+}
+
+try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
+final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
+final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+
+// We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
+// Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
+// and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
+// ContentNotFoundException because if it is thrown, the 
Processor code may catch it and do something else with it
+// but in reality, if it is thrown, we want to know about it 
and handle it, even if the Processor code catches it.
+final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
+boolean cnfeThrown = false;
+
+try {
+recursionSet.add(source);
+StreamUtils.skip(ffais, record.getCurrentClaimOffset());
--- End diff --

Correct @markap14. I updated and rebased the PR taking into account your 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...

2016-05-16 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/439#discussion_r63425823
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 ---
@@ -323,6 +327,37 @@ public void process(final OutputStream outputStream) 
throws IOException {
 assertDisabled(outputStreamHolder.get());
 }
 
+@Test(expected=ProcessException.class)
+public void testExportTo() throws IOException {
+final ContentClaim claim = contentRepo.create(false);
+final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
+.contentClaim(claim)
+.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+.entryDate(System.currentTimeMillis())
+.build();
+flowFileQueue.put(flowFileRecord);
+FlowFile flowFile = session.get();
+assertNotNull(flowFile);
+
+flowFile = session.append(flowFile, new OutputStreamCallback() {
+@Override
+public void process(OutputStream out) throws IOException {
+out.write("Hello World".getBytes());
+}
+});
+
+// should be OK
+ByteArrayOutputStream os = new ByteArrayOutputStream();
+session.exportTo(flowFile, os);
+assertEquals("Hello World", new String(os.toByteArray()));
+os.close();
+
+// should throw ProcessException because of IOException (from 
processor code)
+FileOutputStream mock = Mockito.mock(FileOutputStream.class);
+doThrow(new IOException()).when(mock).write((byte[]) notNull(), 
any(Integer.class), any(Integer.class));
+session.exportTo(flowFile, mock);
--- End diff --

I would recommend wrapping this call in a try/catch and ensuring that 
ProcessException is thrown here. Indicating that it is expected in the @Test 
annotation can be somewhat error-prone, as several other method calls within 
this method could actually throw ProcessException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...

2016-05-16 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/439#discussion_r63425553
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
@@ -2308,18 +2308,50 @@ public void exportTo(final FlowFile source, final 
Path destination, final boolea
 public void exportTo(final FlowFile source, final OutputStream 
destination) {
 validateRecordState(source);
 final StandardRepositoryRecord record = records.get(source);
+
+if(record.getCurrentClaim() == null) {
+return;
+}
+
 try {
-if (record.getCurrentClaim() == null) {
-return;
+ensureNotAppending(record.getCurrentClaim());
+} catch (final IOException e) {
+throw new FlowFileAccessException("Failed to access 
ContentClaim for " + source.toString(), e);
+}
+
+try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
+final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
+final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+
+// We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
+// Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
+// and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
+// ContentNotFoundException because if it is thrown, the 
Processor code may catch it and do something else with it
+// but in reality, if it is thrown, we want to know about it 
and handle it, even if the Processor code catches it.
+final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
+boolean cnfeThrown = false;
+
+try {
+recursionSet.add(source);
+StreamUtils.skip(ffais, record.getCurrentClaimOffset());
+StreamUtils.copy(ffais, destination, source.getSize());
+} catch (final ContentNotFoundException cnfe) {
+cnfeThrown = true;
+throw cnfe;
+} finally {
+recursionSet.remove(source);
+IOUtils.closeQuietly(ffais);
+// if cnfeThrown is true, we don't need to re-thrown the 
Exception; it will propagate.
--- End diff --

i think the comment here is supposed to see "need to re-throw" rather than 
"need to re-thrown"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...

2016-05-16 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/439#discussion_r63425128
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
@@ -2308,18 +2308,50 @@ public void exportTo(final FlowFile source, final 
Path destination, final boolea
 public void exportTo(final FlowFile source, final OutputStream 
destination) {
 validateRecordState(source);
 final StandardRepositoryRecord record = records.get(source);
+
+if(record.getCurrentClaim() == null) {
+return;
+}
+
 try {
-if (record.getCurrentClaim() == null) {
-return;
+ensureNotAppending(record.getCurrentClaim());
+} catch (final IOException e) {
+throw new FlowFileAccessException("Failed to access 
ContentClaim for " + source.toString(), e);
+}
+
+try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
+final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
+final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+
+// We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
+// Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
+// and translates into either FlowFileAccessException or 
ContentNotFoundException. We keep track of any
+// ContentNotFoundException because if it is thrown, the 
Processor code may catch it and do something else with it
+// but in reality, if it is thrown, we want to know about it 
and handle it, even if the Processor code catches it.
+final FlowFileAccessInputStream ffais = new 
FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
+boolean cnfeThrown = false;
+
+try {
+recursionSet.add(source);
+StreamUtils.skip(ffais, record.getCurrentClaimOffset());
--- End diff --

I don't believe we want to be skipping here. This is done already in the 
call to getInputStream() above, is it not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...

2016-05-16 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/439#discussion_r63423341
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
@@ -2308,18 +2308,46 @@ public void exportTo(final FlowFile source, final 
Path destination, final boolea
 public void exportTo(final FlowFile source, final OutputStream 
destination) {
 validateRecordState(source);
 final StandardRepositoryRecord record = records.get(source);
+
 try {
-if (record.getCurrentClaim() == null) {
--- End diff --

That's a mistake, no valid reason. PR updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...

2016-05-16 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/439#discussion_r63344780
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
@@ -2308,18 +2308,46 @@ public void exportTo(final FlowFile source, final 
Path destination, final boolea
 public void exportTo(final FlowFile source, final OutputStream 
destination) {
 validateRecordState(source);
 final StandardRepositoryRecord record = records.get(source);
+
 try {
-if (record.getCurrentClaim() == null) {
--- End diff --

Can you explain the reasoning behind removing this? If there is no content 
claim, i think it still makes sense to return immediately. This would happen, 
for instance, with some source processors such as ListFile that don't actually 
write any content to the FlowFile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...

2016-05-12 Thread pvillard31
GitHub user pvillard31 opened a pull request:

https://github.com/apache/nifi/pull/439

NIFI-1866 ProcessException handling in StandardProcessSession



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pvillard31/nifi NIFI-1866

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/439.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #439


commit 925e87ffd64f35ac47ae41c8466d217d0eccad36
Author: Pierre Villard 
Date:   2016-05-12T21:31:45Z

NIFI-1866 ProcessException handling in StandardProcessSession




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---