[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652082#comment-15652082 ] Joseph Witt commented on NIFI-2850: --- k - i think it looks good too. +1 merged to master. Testing variety of configs and all behaved well. Saw no impact to performance. > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652079#comment-15652079 ] ASF GitHub Bot commented on NIFI-2850: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1115 > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15652077#comment-15652077 ] ASF subversion and git services commented on NIFI-2850: --- Commit c441a8696d8dce47f263846cdf5ecac8506ba5d3 in nifi's branch refs/heads/master from [~markap14] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=c441a86 ] NIFI-2850 This closes #1115. Added a migrate() method to ProcessSession and refactored BinFiles and MergeContent to use it > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651824#comment-15651824 ] ASF GitHub Bot commented on NIFI-2850: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1115#discussion_r87260657 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java --- @@ -377,7 +377,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final AtomicInteger successfulRecordCount = new AtomicInteger(0); List successfulRecords = new LinkedList<>(); final FlowFile inputFlowFile = flowFile; -final AtomicBoolean incomingFlowFileTransferred = new AtomicBoolean(false); +final AtomicBoolean processingFailure = new AtomicBoolean(false); --- End diff -- @joewitt so in updating MockProcessSession to support the session migration, I found that the MockProcessSession was not properly keeping track of the 'recursionSet' that StandardProcessSession was. I fixed that bug, as it had to be addressed in order to properly implement the session migration in the Mock framework. Once I fixed that bug in Mock ProcessSession, it then exposed this unrelated bug in PutHiveStreaming. However, the bug in PutHiveStreaming would now cause unit test failures, I addressed that in this PR as well. > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651771#comment-15651771 ] ASF GitHub Bot commented on NIFI-2850: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/1115#discussion_r87256917 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java --- @@ -377,7 +377,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final AtomicInteger successfulRecordCount = new AtomicInteger(0); List successfulRecords = new LinkedList<>(); final FlowFile inputFlowFile = flowFile; -final AtomicBoolean incomingFlowFileTransferred = new AtomicBoolean(false); +final AtomicBoolean processingFailure = new AtomicBoolean(false); --- End diff -- why is there any change to the hive streaming processor in this commit/PR? > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15651470#comment-15651470 ] Joseph Witt commented on NIFI-2850: --- will reply with my findings of evaluation as well. > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15633092#comment-15633092 ] ASF GitHub Bot commented on NIFI-2850: -- Github user olegz commented on the issue: https://github.com/apache/nifi/pull/1115 @markap14 all is good. +1 here but think someone else should give it a look as well given its complexity. > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628953#comment-15628953 ] ASF GitHub Bot commented on NIFI-2850: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/1115 @olegz Thanks for reviewing! I have responded to your concerns above. If you feel they need more discussion, then we certainly can. Thanks! > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628907#comment-15628907 ] ASF GitHub Bot commented on NIFI-2850: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1115#discussion_r86136287 --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java --- @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto } final ProcessSession session = sessionFactory.createSession(); -FlowFile flowFile = session.get(); -if (flowFile == null) { +final List flowFiles = session.get(1000); +if (flowFiles.isEmpty()) { break; } -flowFile = this.preprocessFlowFile(context, session, flowFile); - -String groupId = this.getGroupId(context, flowFile); - -final boolean binned = binManager.offer(groupId, flowFile, session); - -// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy. -if (!binned) { -Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); -bin.offer(flowFile, session); -this.readyBins.add(bin); +final MapflowFileGroups = new HashMap<>(); +for (FlowFile flowFile : flowFiles) { +flowFile = this.preprocessFlowFile(context, session, flowFile); +final String groupingIdentifier = getGroupId(context, flowFile); +flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); --- End diff -- I don't believe so. Using putIfAbsent, we would be creating a new ArrayList every time. By using the computeIfAbsent, it allows us to create the ArrayList only if the key is not already present. > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15609009#comment-15609009 ] ASF GitHub Bot commented on NIFI-2850: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1115#discussion_r84719151 --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java --- @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto } final ProcessSession session = sessionFactory.createSession(); -FlowFile flowFile = session.get(); -if (flowFile == null) { +final List flowFiles = session.get(1000); +if (flowFiles.isEmpty()) { break; } -flowFile = this.preprocessFlowFile(context, session, flowFile); - -String groupId = this.getGroupId(context, flowFile); - -final boolean binned = binManager.offer(groupId, flowFile, session); - -// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy. -if (!binned) { -Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); -bin.offer(flowFile, session); -this.readyBins.add(bin); +final MapflowFileGroups = new HashMap<>(); +for (FlowFile flowFile : flowFiles) { +flowFile = this.preprocessFlowFile(context, session, flowFile); +final String groupingIdentifier = getGroupId(context, flowFile); +flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); } -flowFilesBinned++; +for (final Map.Entry entry : flowFileGroups.entrySet()) { +final Set unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory); +for (final FlowFile flowFile : unbinned) { +Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); +bin.offer(flowFile, session); +this.readyBins.add(bin); +} +} --- End diff -- After looking at ```BinManager.offer(..)``` I am not sure I understand what's happening in inner loop above. Arn't you essentially doing the same thing in ```BinManager:201```? > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15609008#comment-15609008 ] ASF GitHub Bot commented on NIFI-2850: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1115#discussion_r84713334 --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java --- @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto } final ProcessSession session = sessionFactory.createSession(); -FlowFile flowFile = session.get(); -if (flowFile == null) { +final List flowFiles = session.get(1000); +if (flowFiles.isEmpty()) { break; } -flowFile = this.preprocessFlowFile(context, session, flowFile); - -String groupId = this.getGroupId(context, flowFile); - -final boolean binned = binManager.offer(groupId, flowFile, session); - -// could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy. -if (!binned) { -Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); -bin.offer(flowFile, session); -this.readyBins.add(bin); +final MapflowFileGroups = new HashMap<>(); +for (FlowFile flowFile : flowFiles) { +flowFile = this.preprocessFlowFile(context, session, flowFile); +final String groupingIdentifier = getGroupId(context, flowFile); +flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); --- End diff -- Given that you're not really computing anything in the provided function and simply returning an empty list wouldn't this be more appropriate ``` flowFileGroups.putIfAbsent(groupingIdentifier, new ArrayList<>()).add(flowFile); ``` > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Joseph Witt > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (NIFI-2850) Provide ability for a FlowFile to be migrated from one Process Session to another
[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1396#comment-1396 ] ASF GitHub Bot commented on NIFI-2850: -- GitHub user markap14 opened a pull request: https://github.com/apache/nifi/pull/1115 NIFI-2850: Added a migrate() method to ProcessSession and refactored … Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. …BinFiles and MergeContent to use it You can merge this pull request into a Git repository by running: $ git pull https://github.com/markap14/nifi NIFI-2850 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1115.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1115 commit d939a3d64fcc97a80322f292d13e3d61485a1b3e Author: Mark PayneDate: 2016-09-29T18:41:35Z NIFI-2850: Added a migrate() method to ProcessSession and refactored BinFiles and MergeContent to use it > Provide ability for a FlowFile to be migrated from one Process Session to > another > - > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)