nifi git commit: NIFI-2636 resolve thread safety problem in UnpackContent
Repository: nifi Updated Branches: refs/heads/master b34de74db -> 5a3d00c7b NIFI-2636 resolve thread safety problem in UnpackContent Signed-off-by: Joe Skora Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a3d00c7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a3d00c7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a3d00c7 Branch: refs/heads/master Commit: 5a3d00c7bb171b4ca35c5533f670ade9513ab6d3 Parents: b34de74 Author: Mike Moser Authored: Tue Aug 23 16:41:13 2016 -0400 Committer: Joe Skora Committed: Fri Sep 23 13:25:03 2016 -0400 -- .../nifi/processors/standard/UnpackContent.java | 67 +--- .../processors/standard/TestUnpackContent.java | 31 + 2 files changed, 62 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/5a3d00c7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index d2de33c..0437ed1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -145,8 +145,6 @@ public class UnpackContent extends AbstractProcessor { private Set relationships; private List properties; -private Unpacker unpacker; -private boolean addFragmentAttrs; private Pattern fileFilter; private Unpacker tarUnpacker; @@ -178,7 +176,6 @@ public class UnpackContent extends AbstractProcessor { @OnStopped public void onStopped() { -unpacker = null; fileFilter = null; } @@ -191,35 +188,6 @@ public class UnpackContent extends AbstractProcessor { } } -public void initUnpacker(PackageFormat packagingFormat) { -switch (packagingFormat) { -case TAR_FORMAT: -case X_TAR_FORMAT: -unpacker = tarUnpacker; -addFragmentAttrs = true; -break; -case ZIP_FORMAT: -unpacker = zipUnpacker; -addFragmentAttrs = true; -break; -case FLOWFILE_STREAM_FORMAT_V2: -unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); -addFragmentAttrs = false; -break; -case FLOWFILE_STREAM_FORMAT_V3: -unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); -addFragmentAttrs = false; -break; -case FLOWFILE_TAR_FORMAT: -unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); -addFragmentAttrs = false; -break; -case AUTO_DETECT_FORMAT: -// The format of the unpacker should be known before initialization -throw new ProcessException(packagingFormat + " is not a valid packaging format"); -} -} - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -247,11 +215,38 @@ public class UnpackContent extends AbstractProcessor { logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); session.transfer(flowFile, REL_SUCCESS); return; -} else { -initUnpacker(packagingFormat); } -} else { -initUnpacker(packagingFormat); +} + +// set the Unpacker to use for this FlowFile. FlowFileUnpackager objects maintain state and are not reusable. +final Unpacker unpacker; +final boolean addFragmentAttrs; +switch (packagingFormat) { +case TAR_FORMAT: +case X_TAR_FORMAT: +unpacker = tarUnpacker; +addFragmentAttrs = true; +break; +case ZIP_FORMAT: +unpacker = zipUnpacker; +addFragmentAttrs = true; +break; +case FLOWFILE_STREAM_FORMAT_V2: +unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); +addFragmentAttrs = false; +break; +
nifi git commit: NIFI-2636 resolve thread safety problem in UnpackContent
Repository: nifi Updated Branches: refs/heads/0.x c2e98f96e -> a952dc96a NIFI-2636 resolve thread safety problem in UnpackContent Signed-off-by: Joe Skora Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a952dc96 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a952dc96 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a952dc96 Branch: refs/heads/0.x Commit: a952dc96a23a418e25f9dc49c8874b59adc34654 Parents: c2e98f9 Author: Mike Moser Authored: Tue Aug 23 16:41:13 2016 -0400 Committer: Joe Skora Committed: Thu Sep 22 14:23:30 2016 -0400 -- .../nifi/processors/standard/UnpackContent.java | 67 +--- .../processors/standard/TestUnpackContent.java | 31 + 2 files changed, 62 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/a952dc96/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java index 45e17c1..85b8029 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java @@ -145,8 +145,6 @@ public class UnpackContent extends AbstractProcessor { private Set relationships; private List properties; -private Unpacker unpacker; -private boolean addFragmentAttrs; private Pattern fileFilter; private Unpacker tarUnpacker; @@ -178,7 +176,6 @@ public class UnpackContent extends AbstractProcessor { @OnStopped public void onStopped() { -unpacker = null; fileFilter = null; } @@ -191,35 +188,6 @@ public class UnpackContent extends AbstractProcessor { } } -public void initUnpacker(PackageFormat packagingFormat) { -switch (packagingFormat) { -case TAR_FORMAT: -case X_TAR_FORMAT: -unpacker = tarUnpacker; -addFragmentAttrs = true; -break; -case ZIP_FORMAT: -unpacker = zipUnpacker; -addFragmentAttrs = true; -break; -case FLOWFILE_STREAM_FORMAT_V2: -unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); -addFragmentAttrs = false; -break; -case FLOWFILE_STREAM_FORMAT_V3: -unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV3()); -addFragmentAttrs = false; -break; -case FLOWFILE_TAR_FORMAT: -unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV1()); -addFragmentAttrs = false; -break; -case AUTO_DETECT_FORMAT: -// The format of the unpacker should be known before initialization -throw new ProcessException(packagingFormat + " is not a valid packaging format"); -} -} - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -247,11 +215,38 @@ public class UnpackContent extends AbstractProcessor { logger.info("Cannot unpack {} because its mime.type attribute is set to '{}', which is not a format that can be unpacked; routing to 'success'", new Object[]{flowFile, mimeType}); session.transfer(flowFile, REL_SUCCESS); return; -} else { -initUnpacker(packagingFormat); } -} else { -initUnpacker(packagingFormat); +} + +// set the Unpacker to use for this FlowFile. FlowFileUnpackager objects maintain state and are not reusable. +final Unpacker unpacker; +final boolean addFragmentAttrs; +switch (packagingFormat) { +case TAR_FORMAT: +case X_TAR_FORMAT: +unpacker = tarUnpacker; +addFragmentAttrs = true; +break; +case ZIP_FORMAT: +unpacker = zipUnpacker; +addFragmentAttrs = true; +break; +case FLOWFILE_STREAM_FORMAT_V2: +unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2()); +addFragmentAttrs = false; +break; +