[GitHub] [nifi] MikeThomsen commented on a change in pull request #4336: NIFI-7515 Added 7Zip support to UnpackContent
MikeThomsen commented on a change in pull request #4336: URL: https://github.com/apache/nifi/pull/4336#discussion_r444853167 ## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ## @@ -394,6 +412,65 @@ public void process(final OutputStream out) throws IOException { } } +private static class SevenZipUnpacker extends Unpacker { +public SevenZipUnpacker(Pattern fileFilter) { +super(fileFilter); +} + +@Override +public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { +final String fragmentId = UUID.randomUUID().toString(); +session.read(source, new InputStreamCallback() { +@Override +public void process(final InputStream in) throws IOException { + +int fragmentCount = 0; +byte[] inputData = IOUtils.toByteArray(in); Review comment: I'd have to think about this. I saw an example on google of converting an InputStream into the seekablesource (or whatever it's called), but it didn't say whether or not the process put it all in memory. We should probably take a little time to run that down to see if it's possible to safely wrap an InputStream before trying a temp file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] MikeThomsen commented on a change in pull request #4336: NIFI-7515 Added 7Zip support to UnpackContent
MikeThomsen commented on a change in pull request #4336: URL: https://github.com/apache/nifi/pull/4336#discussion_r444852394 ## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ## @@ -394,6 +412,65 @@ public void process(final OutputStream out) throws IOException { } } +private static class SevenZipUnpacker extends Unpacker { +public SevenZipUnpacker(Pattern fileFilter) { +super(fileFilter); +} + +@Override +public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { +final String fragmentId = UUID.randomUUID().toString(); +session.read(source, new InputStreamCallback() { +@Override +public void process(final InputStream in) throws IOException { + +int fragmentCount = 0; +byte[] inputData = IOUtils.toByteArray(in); + +SeekableInMemoryByteChannel inMemoryByteChannel = new SeekableInMemoryByteChannel(inputData); +SevenZFile sevenZFile = new SevenZFile(inMemoryByteChannel); +SevenZArchiveEntry sevenZArchiveEntry; + +while ((sevenZArchiveEntry = sevenZFile.getNextEntry()) != null) { +if (sevenZArchiveEntry.isDirectory() || !fileMatches(sevenZArchiveEntry)) { +continue; +} + +final File file = new File(sevenZArchiveEntry.getName()); +final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent(); +final Path absPath = file.toPath().toAbsolutePath(); +final String absPathString = absPath.getParent().toString() + "/"; + +FlowFile unpackedFile = session.create(source); +try { +final Map attributes = new HashMap<>(); +attributes.put(CoreAttributes.FILENAME.key(), file.getName()); +attributes.put(CoreAttributes.PATH.key(), parentDirectory); +attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); +attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + +attributes.put(FRAGMENT_ID, fragmentId); +attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); + +unpackedFile = session.putAllAttributes(unpackedFile, attributes); +byte[] content = new byte[(int)sevenZArchiveEntry.getSize()]; Review comment: Hard-coding it between 32kb and 1MB should be fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] MikeThomsen commented on a change in pull request #4336: NIFI-7515 Added 7Zip support to UnpackContent
MikeThomsen commented on a change in pull request #4336: URL: https://github.com/apache/nifi/pull/4336#discussion_r33577 ## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ## @@ -394,6 +412,65 @@ public void process(final OutputStream out) throws IOException { } } +private static class SevenZipUnpacker extends Unpacker { +public SevenZipUnpacker(Pattern fileFilter) { +super(fileFilter); +} + +@Override +public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { +final String fragmentId = UUID.randomUUID().toString(); +session.read(source, new InputStreamCallback() { +@Override +public void process(final InputStream in) throws IOException { + +int fragmentCount = 0; +byte[] inputData = IOUtils.toByteArray(in); Review comment: This could easily cause the JVM to run out of memory. It looks like what you might need to do is create a temporary `java.io.File` and export the contents of the flowfile to that because the SevenZFile class doesn't allow you to natively use an InputStream. That could be dangerous in its own right, but at least users could mitigate it by resizing the partition if there is sufficient documentation warning them. ## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java ## @@ -394,6 +412,65 @@ public void process(final OutputStream out) throws IOException { } } +private static class SevenZipUnpacker extends Unpacker { +public SevenZipUnpacker(Pattern fileFilter) { +super(fileFilter); +} + +@Override +public void unpack(final ProcessSession session, final FlowFile source, final List unpacked) { +final String fragmentId = UUID.randomUUID().toString(); +session.read(source, new InputStreamCallback() { +@Override +public void process(final InputStream in) throws IOException { + +int fragmentCount = 0; +byte[] inputData = IOUtils.toByteArray(in); + +SeekableInMemoryByteChannel inMemoryByteChannel = new SeekableInMemoryByteChannel(inputData); +SevenZFile sevenZFile = new SevenZFile(inMemoryByteChannel); +SevenZArchiveEntry sevenZArchiveEntry; + +while ((sevenZArchiveEntry = sevenZFile.getNextEntry()) != null) { +if (sevenZArchiveEntry.isDirectory() || !fileMatches(sevenZArchiveEntry)) { +continue; +} + +final File file = new File(sevenZArchiveEntry.getName()); +final String parentDirectory = (file.getParent() == null) ? "/" : file.getParent(); +final Path absPath = file.toPath().toAbsolutePath(); +final String absPathString = absPath.getParent().toString() + "/"; + +FlowFile unpackedFile = session.create(source); +try { +final Map attributes = new HashMap<>(); +attributes.put(CoreAttributes.FILENAME.key(), file.getName()); +attributes.put(CoreAttributes.PATH.key(), parentDirectory); +attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); +attributes.put(CoreAttributes.MIME_TYPE.key(), OCTET_STREAM); + +attributes.put(FRAGMENT_ID, fragmentId); +attributes.put(FRAGMENT_INDEX, String.valueOf(++fragmentCount)); + +unpackedFile = session.putAllAttributes(unpackedFile, attributes); +byte[] content = new byte[(int)sevenZArchiveEntry.getSize()]; Review comment: You should be reading the file progressively into a buffer using a loop because you could easily blow up the JVM here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org