[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/439#discussion_r63429069 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java --- @@ -2308,18 +2308,50 @@ public void exportTo(final FlowFile source, final Path destination, final boolea public void exportTo(final FlowFile source, final OutputStream destination) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); + +if(record.getCurrentClaim() == null) { +return; +} + try { -if (record.getCurrentClaim() == null) { -return; +ensureNotAppending(record.getCurrentClaim()); +} catch (final IOException e) { +throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); +} + +try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); +final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); +final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); +final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + +// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from +// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository +// and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any +// ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it +// but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. +final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); +boolean cnfeThrown = false; + +try { +recursionSet.add(source); +StreamUtils.skip(ffais, record.getCurrentClaimOffset()); --- End diff -- Correct @markap14. I updated and rebased the PR taking into account your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/439#discussion_r63425823 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java --- @@ -323,6 +327,37 @@ public void process(final OutputStream outputStream) throws IOException { assertDisabled(outputStreamHolder.get()); } +@Test(expected=ProcessException.class) +public void testExportTo() throws IOException { +final ContentClaim claim = contentRepo.create(false); +final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() +.contentClaim(claim) +.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") +.entryDate(System.currentTimeMillis()) +.build(); +flowFileQueue.put(flowFileRecord); +FlowFile flowFile = session.get(); +assertNotNull(flowFile); + +flowFile = session.append(flowFile, new OutputStreamCallback() { +@Override +public void process(OutputStream out) throws IOException { +out.write("Hello World".getBytes()); +} +}); + +// should be OK +ByteArrayOutputStream os = new ByteArrayOutputStream(); +session.exportTo(flowFile, os); +assertEquals("Hello World", new String(os.toByteArray())); +os.close(); + +// should throw ProcessException because of IOException (from processor code) +FileOutputStream mock = Mockito.mock(FileOutputStream.class); +doThrow(new IOException()).when(mock).write((byte[]) notNull(), any(Integer.class), any(Integer.class)); +session.exportTo(flowFile, mock); --- End diff -- I would recommend wrapping this call in a try/catch and ensuring that ProcessException is thrown here. Indicating that it is expected in the @Test annotation can be somewhat error-prone, as several other method calls within this method could actually throw ProcessException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/439#discussion_r63425553 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java --- @@ -2308,18 +2308,50 @@ public void exportTo(final FlowFile source, final Path destination, final boolea public void exportTo(final FlowFile source, final OutputStream destination) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); + +if(record.getCurrentClaim() == null) { +return; +} + try { -if (record.getCurrentClaim() == null) { -return; +ensureNotAppending(record.getCurrentClaim()); +} catch (final IOException e) { +throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); +} + +try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); +final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); +final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); +final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + +// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from +// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository +// and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any +// ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it +// but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. +final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); +boolean cnfeThrown = false; + +try { +recursionSet.add(source); +StreamUtils.skip(ffais, record.getCurrentClaimOffset()); +StreamUtils.copy(ffais, destination, source.getSize()); +} catch (final ContentNotFoundException cnfe) { +cnfeThrown = true; +throw cnfe; +} finally { +recursionSet.remove(source); +IOUtils.closeQuietly(ffais); +// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. --- End diff -- i think the comment here is supposed to see "need to re-throw" rather than "need to re-thrown" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/439#discussion_r63425128 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java --- @@ -2308,18 +2308,50 @@ public void exportTo(final FlowFile source, final Path destination, final boolea public void exportTo(final FlowFile source, final OutputStream destination) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); + +if(record.getCurrentClaim() == null) { +return; +} + try { -if (record.getCurrentClaim() == null) { -return; +ensureNotAppending(record.getCurrentClaim()); +} catch (final IOException e) { +throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); +} + +try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); +final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); +final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); +final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { + +// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from +// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository +// and translates into either FlowFileAccessException or ContentNotFoundException. We keep track of any +// ContentNotFoundException because if it is thrown, the Processor code may catch it and do something else with it +// but in reality, if it is thrown, we want to know about it and handle it, even if the Processor code catches it. +final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); +boolean cnfeThrown = false; + +try { +recursionSet.add(source); +StreamUtils.skip(ffais, record.getCurrentClaimOffset()); --- End diff -- I don't believe we want to be skipping here. This is done already in the call to getInputStream() above, is it not? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/439#discussion_r63423341 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java --- @@ -2308,18 +2308,46 @@ public void exportTo(final FlowFile source, final Path destination, final boolea public void exportTo(final FlowFile source, final OutputStream destination) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); + try { -if (record.getCurrentClaim() == null) { --- End diff -- That's a mistake, no valid reason. PR updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/439#discussion_r63344780 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java --- @@ -2308,18 +2308,46 @@ public void exportTo(final FlowFile source, final Path destination, final boolea public void exportTo(final FlowFile source, final OutputStream destination) { validateRecordState(source); final StandardRepositoryRecord record = records.get(source); + try { -if (record.getCurrentClaim() == null) { --- End diff -- Can you explain the reasoning behind removing this? If there is no content claim, i think it still makes sense to return immediately. This would happen, for instance, with some source processors such as ListFile that don't actually write any content to the FlowFile. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-1866 ProcessException handling in Standard...
GitHub user pvillard31 opened a pull request: https://github.com/apache/nifi/pull/439 NIFI-1866 ProcessException handling in StandardProcessSession You can merge this pull request into a Git repository by running: $ git pull https://github.com/pvillard31/nifi NIFI-1866 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/439.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #439 commit 925e87ffd64f35ac47ae41c8466d217d0eccad36 Author: Pierre VillardDate: 2016-05-12T21:31:45Z NIFI-1866 ProcessException handling in StandardProcessSession --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---