nifi git commit: NIFI-2636 resolve thread safety problem in UnpackContent

2016-09-23 Thread jskora
Repository: nifi
Updated Branches:
  refs/heads/master b34de74db -> 5a3d00c7b


NIFI-2636 resolve thread safety problem in UnpackContent

Signed-off-by: Joe Skora 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a3d00c7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a3d00c7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a3d00c7

Branch: refs/heads/master
Commit: 5a3d00c7bb171b4ca35c5533f670ade9513ab6d3
Parents: b34de74
Author: Mike Moser 
Authored: Tue Aug 23 16:41:13 2016 -0400
Committer: Joe Skora 
Committed: Fri Sep 23 13:25:03 2016 -0400

--
 .../nifi/processors/standard/UnpackContent.java | 67 +---
 .../processors/standard/TestUnpackContent.java  | 31 +
 2 files changed, 62 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/5a3d00c7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index d2de33c..0437ed1 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -145,8 +145,6 @@ public class UnpackContent extends AbstractProcessor {
 private Set relationships;
 private List properties;
 
-private Unpacker unpacker;
-private boolean addFragmentAttrs;
 private Pattern fileFilter;
 
 private Unpacker tarUnpacker;
@@ -178,7 +176,6 @@ public class UnpackContent extends AbstractProcessor {
 
 @OnStopped
 public void onStopped() {
-unpacker = null;
 fileFilter = null;
 }
 
@@ -191,35 +188,6 @@ public class UnpackContent extends AbstractProcessor {
 }
 }
 
-public void initUnpacker(PackageFormat packagingFormat) {
-switch (packagingFormat) {
-case TAR_FORMAT:
-case X_TAR_FORMAT:
-unpacker = tarUnpacker;
-addFragmentAttrs = true;
-break;
-case ZIP_FORMAT:
-unpacker = zipUnpacker;
-addFragmentAttrs = true;
-break;
-case FLOWFILE_STREAM_FORMAT_V2:
-unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV2());
-addFragmentAttrs = false;
-break;
-case FLOWFILE_STREAM_FORMAT_V3:
-unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV3());
-addFragmentAttrs = false;
-break;
-case FLOWFILE_TAR_FORMAT:
-unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV1());
-addFragmentAttrs = false;
-break;
-case AUTO_DETECT_FORMAT:
-// The format of the unpacker should be known before 
initialization
-throw new ProcessException(packagingFormat + " is not a valid 
packaging format");
-}
-}
-
 @Override
 public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
 FlowFile flowFile = session.get();
@@ -247,11 +215,38 @@ public class UnpackContent extends AbstractProcessor {
 logger.info("Cannot unpack {} because its mime.type attribute 
is set to '{}', which is not a format that can be unpacked; routing to 
'success'", new Object[]{flowFile, mimeType});
 session.transfer(flowFile, REL_SUCCESS);
 return;
-} else {
-initUnpacker(packagingFormat);
 }
-} else {
-initUnpacker(packagingFormat);
+}
+
+// set the Unpacker to use for this FlowFile.  FlowFileUnpackager 
objects maintain state and are not reusable.
+final Unpacker unpacker;
+final boolean addFragmentAttrs;
+switch (packagingFormat) {
+case TAR_FORMAT:
+case X_TAR_FORMAT:
+unpacker = tarUnpacker;
+addFragmentAttrs = true;
+break;
+case ZIP_FORMAT:
+unpacker = zipUnpacker;
+addFragmentAttrs = true;
+break;
+case FLOWFILE_STREAM_FORMAT_V2:
+unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
+addFragmentAttrs = false;
+break;
+  

nifi git commit: NIFI-2636 resolve thread safety problem in UnpackContent

2016-09-22 Thread jskora
Repository: nifi
Updated Branches:
  refs/heads/0.x c2e98f96e -> a952dc96a


NIFI-2636 resolve thread safety problem in UnpackContent

Signed-off-by: Joe Skora 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a952dc96
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a952dc96
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a952dc96

Branch: refs/heads/0.x
Commit: a952dc96a23a418e25f9dc49c8874b59adc34654
Parents: c2e98f9
Author: Mike Moser 
Authored: Tue Aug 23 16:41:13 2016 -0400
Committer: Joe Skora 
Committed: Thu Sep 22 14:23:30 2016 -0400

--
 .../nifi/processors/standard/UnpackContent.java | 67 +---
 .../processors/standard/TestUnpackContent.java  | 31 +
 2 files changed, 62 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/a952dc96/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 45e17c1..85b8029 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -145,8 +145,6 @@ public class UnpackContent extends AbstractProcessor {
 private Set relationships;
 private List properties;
 
-private Unpacker unpacker;
-private boolean addFragmentAttrs;
 private Pattern fileFilter;
 
 private Unpacker tarUnpacker;
@@ -178,7 +176,6 @@ public class UnpackContent extends AbstractProcessor {
 
 @OnStopped
 public void onStopped() {
-unpacker = null;
 fileFilter = null;
 }
 
@@ -191,35 +188,6 @@ public class UnpackContent extends AbstractProcessor {
 }
 }
 
-public void initUnpacker(PackageFormat packagingFormat) {
-switch (packagingFormat) {
-case TAR_FORMAT:
-case X_TAR_FORMAT:
-unpacker = tarUnpacker;
-addFragmentAttrs = true;
-break;
-case ZIP_FORMAT:
-unpacker = zipUnpacker;
-addFragmentAttrs = true;
-break;
-case FLOWFILE_STREAM_FORMAT_V2:
-unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV2());
-addFragmentAttrs = false;
-break;
-case FLOWFILE_STREAM_FORMAT_V3:
-unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV3());
-addFragmentAttrs = false;
-break;
-case FLOWFILE_TAR_FORMAT:
-unpacker = new FlowFileStreamUnpacker(new 
FlowFileUnpackagerV1());
-addFragmentAttrs = false;
-break;
-case AUTO_DETECT_FORMAT:
-// The format of the unpacker should be known before 
initialization
-throw new ProcessException(packagingFormat + " is not a valid 
packaging format");
-}
-}
-
 @Override
 public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
 FlowFile flowFile = session.get();
@@ -247,11 +215,38 @@ public class UnpackContent extends AbstractProcessor {
 logger.info("Cannot unpack {} because its mime.type attribute 
is set to '{}', which is not a format that can be unpacked; routing to 
'success'", new Object[]{flowFile, mimeType});
 session.transfer(flowFile, REL_SUCCESS);
 return;
-} else {
-initUnpacker(packagingFormat);
 }
-} else {
-initUnpacker(packagingFormat);
+}
+
+// set the Unpacker to use for this FlowFile.  FlowFileUnpackager 
objects maintain state and are not reusable.
+final Unpacker unpacker;
+final boolean addFragmentAttrs;
+switch (packagingFormat) {
+case TAR_FORMAT:
+case X_TAR_FORMAT:
+unpacker = tarUnpacker;
+addFragmentAttrs = true;
+break;
+case ZIP_FORMAT:
+unpacker = zipUnpacker;
+addFragmentAttrs = true;
+break;
+case FLOWFILE_STREAM_FORMAT_V2:
+unpacker = new FlowFileStreamUnpacker(new FlowFileUnpackagerV2());
+addFragmentAttrs = false;
+break;
+