[GitHub] [nifi] MikeThomsen commented on a change in pull request #4336: NIFI-7515 Added 7Zip support to UnpackContent

2020-06-24 Thread GitBox


MikeThomsen commented on a change in pull request #4336:
URL: https://github.com/apache/nifi/pull/4336#discussion_r444853167



##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
##
@@ -394,6 +412,65 @@ public void process(final OutputStream out) throws 
IOException {
 }
 }
 
+private static class SevenZipUnpacker extends Unpacker {
+public SevenZipUnpacker(Pattern fileFilter) {
+super(fileFilter);
+}
+
+@Override
+public void unpack(final ProcessSession session, final FlowFile 
source, final List unpacked) {
+final String fragmentId = UUID.randomUUID().toString();
+session.read(source, new InputStreamCallback() {
+@Override
+public void process(final InputStream in) throws IOException {
+
+int fragmentCount = 0;
+byte[] inputData = IOUtils.toByteArray(in);

Review comment:
   I'd have to think about this. I saw an example on google of converting 
an InputStream into the seekablesource (or whatever it's called), but it didn't 
say whether or not the process put it all in memory. We should probably take a 
little time to run that down to see if it's possible to safely wrap an 
InputStream before trying a temp file.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen commented on a change in pull request #4336: NIFI-7515 Added 7Zip support to UnpackContent

2020-06-24 Thread GitBox


MikeThomsen commented on a change in pull request #4336:
URL: https://github.com/apache/nifi/pull/4336#discussion_r444852394



##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
##
@@ -394,6 +412,65 @@ public void process(final OutputStream out) throws 
IOException {
 }
 }
 
+private static class SevenZipUnpacker extends Unpacker {
+public SevenZipUnpacker(Pattern fileFilter) {
+super(fileFilter);
+}
+
+@Override
+public void unpack(final ProcessSession session, final FlowFile 
source, final List unpacked) {
+final String fragmentId = UUID.randomUUID().toString();
+session.read(source, new InputStreamCallback() {
+@Override
+public void process(final InputStream in) throws IOException {
+
+int fragmentCount = 0;
+byte[] inputData = IOUtils.toByteArray(in);
+
+SeekableInMemoryByteChannel inMemoryByteChannel = new 
SeekableInMemoryByteChannel(inputData);
+SevenZFile sevenZFile = new 
SevenZFile(inMemoryByteChannel);
+SevenZArchiveEntry sevenZArchiveEntry;
+
+while ((sevenZArchiveEntry = sevenZFile.getNextEntry()) != 
null) {
+if (sevenZArchiveEntry.isDirectory() || 
!fileMatches(sevenZArchiveEntry)) {
+continue;
+}
+
+final File file = new 
File(sevenZArchiveEntry.getName());
+final String parentDirectory = (file.getParent() == 
null) ? "/" : file.getParent();
+final Path absPath = file.toPath().toAbsolutePath();
+final String absPathString = 
absPath.getParent().toString() + "/";
+
+FlowFile unpackedFile = session.create(source);
+try {
+final Map attributes = new 
HashMap<>();
+attributes.put(CoreAttributes.FILENAME.key(), 
file.getName());
+attributes.put(CoreAttributes.PATH.key(), 
parentDirectory);
+attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
absPathString);
+attributes.put(CoreAttributes.MIME_TYPE.key(), 
OCTET_STREAM);
+
+attributes.put(FRAGMENT_ID, fragmentId);
+attributes.put(FRAGMENT_INDEX, 
String.valueOf(++fragmentCount));
+
+unpackedFile = 
session.putAllAttributes(unpackedFile, attributes);
+byte[] content = new 
byte[(int)sevenZArchiveEntry.getSize()];

Review comment:
   Hard-coding it between 32kb and 1MB should be fine.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] MikeThomsen commented on a change in pull request #4336: NIFI-7515 Added 7Zip support to UnpackContent

2020-06-23 Thread GitBox


MikeThomsen commented on a change in pull request #4336:
URL: https://github.com/apache/nifi/pull/4336#discussion_r33577



##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
##
@@ -394,6 +412,65 @@ public void process(final OutputStream out) throws 
IOException {
 }
 }
 
+private static class SevenZipUnpacker extends Unpacker {
+public SevenZipUnpacker(Pattern fileFilter) {
+super(fileFilter);
+}
+
+@Override
+public void unpack(final ProcessSession session, final FlowFile 
source, final List unpacked) {
+final String fragmentId = UUID.randomUUID().toString();
+session.read(source, new InputStreamCallback() {
+@Override
+public void process(final InputStream in) throws IOException {
+
+int fragmentCount = 0;
+byte[] inputData = IOUtils.toByteArray(in);

Review comment:
   This could easily cause the JVM to run out of memory. It looks like what 
you might need to do is create a temporary `java.io.File` and export the 
contents of the flowfile to that because the SevenZFile class doesn't allow you 
to natively use an InputStream. That could be dangerous in its own right, but 
at least users could mitigate it by resizing the partition if there is 
sufficient documentation warning them.

##
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
##
@@ -394,6 +412,65 @@ public void process(final OutputStream out) throws 
IOException {
 }
 }
 
+private static class SevenZipUnpacker extends Unpacker {
+public SevenZipUnpacker(Pattern fileFilter) {
+super(fileFilter);
+}
+
+@Override
+public void unpack(final ProcessSession session, final FlowFile 
source, final List unpacked) {
+final String fragmentId = UUID.randomUUID().toString();
+session.read(source, new InputStreamCallback() {
+@Override
+public void process(final InputStream in) throws IOException {
+
+int fragmentCount = 0;
+byte[] inputData = IOUtils.toByteArray(in);
+
+SeekableInMemoryByteChannel inMemoryByteChannel = new 
SeekableInMemoryByteChannel(inputData);
+SevenZFile sevenZFile = new 
SevenZFile(inMemoryByteChannel);
+SevenZArchiveEntry sevenZArchiveEntry;
+
+while ((sevenZArchiveEntry = sevenZFile.getNextEntry()) != 
null) {
+if (sevenZArchiveEntry.isDirectory() || 
!fileMatches(sevenZArchiveEntry)) {
+continue;
+}
+
+final File file = new 
File(sevenZArchiveEntry.getName());
+final String parentDirectory = (file.getParent() == 
null) ? "/" : file.getParent();
+final Path absPath = file.toPath().toAbsolutePath();
+final String absPathString = 
absPath.getParent().toString() + "/";
+
+FlowFile unpackedFile = session.create(source);
+try {
+final Map attributes = new 
HashMap<>();
+attributes.put(CoreAttributes.FILENAME.key(), 
file.getName());
+attributes.put(CoreAttributes.PATH.key(), 
parentDirectory);
+attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
absPathString);
+attributes.put(CoreAttributes.MIME_TYPE.key(), 
OCTET_STREAM);
+
+attributes.put(FRAGMENT_ID, fragmentId);
+attributes.put(FRAGMENT_INDEX, 
String.valueOf(++fragmentCount));
+
+unpackedFile = 
session.putAllAttributes(unpackedFile, attributes);
+byte[] content = new 
byte[(int)sevenZArchiveEntry.getSize()];

Review comment:
   You should be reading the file progressively into a buffer using a loop 
because you could easily blow up the JVM here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org