Hi Jeff, Yes, I took a fresh clone from git hub . I cleaned my maven repo as well before building .
Attached StatusMerger.java and ProcessorStatusSnapshotDTO.java All, It will be great if others also check if it is happening when u build. I am worried whether I am doing some thing really stupid. Thanks & Regards Tijo Thomas On 16-Sep-2016 7:47 pm, "Jeff" <[email protected]> wrote: > Thank you for the information. Did you try running the tests on a fresh > clone of the github repo? > > Could you please link me to or include the contents of StatusMerger.java > and ProcessorStatusSnapshotDTO.java? > > On Fri, Sep 16, 2016 at 4:17 AM Tijo Thomas <[email protected]> wrote: > >> >> Output when I ran through IDE, >> >> >> Condition not satisfied: >> >> returnedJson == expectedJson >> | | | >> | | {"id":"hidden","groupId":"hidd >> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0 >> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:00:00.000","activeThreadCount":0} >> | false >> | 1 difference (99% similarity) >> | {"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:(3)0:00.000","activeThreadCount":0} >> | {"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:(0)0:00.000","activeThreadCount":0} >> {"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:30:00.000","activeThreadCount":0} >> >> Expected :{"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:00:00.000","activeThreadCount":0} >> >> Actual :{"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:30:00.000","activeThreadCount":0} >> >> >> Taskduration is showing 30 min different when compared to expected and >> actuals. >> >> at org.apache.nifi.cluster.manager.PermissionBasedStatusMergerSpec.Merge >> ProcessorStatusSnapshotDTO(PermissionBasedStatusMergerSpec.groovy:257) >> >> >> >> Condition not satisfied: >> >> returnedJson == expectedJson >> | | | >> | | {"id":"hidden","groupId":"hidd >> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0 >> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:00:00.000","activeThreadCount":0} >> | false >> | 1 difference (99% similarity) >> | {"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:(3)0:00.000","activeThreadCount":0} >> | {"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:(0)0:00.000","activeThreadCount":0} >> {"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:30:00.000","activeThreadCount":0} >> >> Expected :{"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:00:00.000","activeThreadCount":0} >> >> Actual :{"id":"hidden","groupId":"hidden","name":"hidden","type":" >> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0 >> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0 >> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >> tasksDuration":"00:30:00.000","activeThreadCount":0} >> >> >> mvn --version >> >> Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; >> 2015-11-10T22:11:47+05:30) >> Maven home: /home/tijo/apache-maven-3.3.9 >> Java version: 1.8.0_91, vendor: Oracle Corporation >> Java home: /home/tijo/jdk1.8.0_91/jre >> Default locale: en_IN, platform encoding: UTF-8 >> OS name: "linux", version: "3.13.0-95-generic", arch: "amd64", family: >> "unix" >> >> >> Tijo >> >> >> >> On Thu, Sep 15, 2016 at 8:07 PM, Jeff <[email protected]> wrote: >> >>> Hello, >>> >>> Could you try building a fresh clone of the NIFI repo? What is your dev >>> environment? What versions of java and maven are you running? >>> >>> Does PermissionBasedStatusMergerSpec work when you run it directly via >>> maven and/or an IDE? >>> >>> On Wed, Sep 14, 2016 at 2:37 PM Tijo Thomas <[email protected]> >>> wrote: >>> >>>> Hi Jeff, >>>> >>>> I tried replacing the mvn repo and did mvn clean package. >>>> >>>> I am still getting this error. >>>> >>>> I am sorry I am not getting any clue why it is failing. >>>> >>>> Can you check again. >>>> >>>> Tijo >>>> >>>> On Wed, Sep 14, 2016 at 9:27 PM, Jeff <[email protected]> wrote: >>>> >>>>> Ok, sounds good! Please let us know! >>>>> >>>>> On Wed, Sep 14, 2016 at 12:04 AM Tijo Thomas <[email protected]> >>>>> wrote: >>>>> >>>>>> Sorry to reply late. I was on vacation for last 4 days. >>>>>> >>>>>> I have not modified any files. >>>>>> >>>>>> I think there is some problem with my repo. I will make a new repo >>>>>> and try again. Still the problem exist I will post it again in the >>>>>> group. >>>>>> >>>>>> Thank you very much for your support. >>>>>> >>>>>> Tijo >>>>>> >>>>>> On 10-Sep-2016 6:44 pm, "Jeff" <[email protected]> wrote: >>>>>> >>>>>>> Tijo, >>>>>>> >>>>>>> Have you modified ProcessorStatusSnapshotDTO.java or >>>>>>> PermissionBasedStatusMergerSpec.groovy? >>>>>>> >>>>>>> On Sat, Sep 10, 2016 at 7:48 AM Tijo Thomas <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jeff >>>>>>>> >>>>>>>> I recently rebase from master. >>>>>>>> Then I cloned again and ran mvn package >>>>>>>> >>>>>>>> Tijo >>>>>>>> >>>>>>>> On 09-Sep-2016 9:12 pm, "Jeff" <[email protected]> wrote: >>>>>>>> >>>>>>>>> Tijo, >>>>>>>>> >>>>>>>>> I just ran this test on master and it's passing for me. Can you >>>>>>>>> provide some details about the branch you're on when running the >>>>>>>>> tests? I >>>>>>>>> see that tasksDuration is 00:30:00.000 when it's expecting >>>>>>>>> 00:00:00.000, >>>>>>>>> and that's why the JSON isn't matching. >>>>>>>>> >>>>>>>>> On Thu, Sep 8, 2016 at 4:58 PM Tijo Thomas <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi >>>>>>>>>> Nifi test case is failing (PermissionBasedStatusMergerSpec) . >>>>>>>>>> This is written in Grovy .. not comfortable with Groovy . >>>>>>>>>> >>>>>>>>>> Running org.apache.nifi.cluster.manage >>>>>>>>>> r.PermissionBasedStatusMergerSpec >>>>>>>>>> Tests run: 20, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: >>>>>>>>>> 0.922 sec <<< FAILURE! - in org.apache.nifi.cluster.manage >>>>>>>>>> r.PermissionBasedStatusMergerSpec >>>>>>>>>> Merge ProcessorStatusSnapshotDTO[0]( >>>>>>>>>> org.apache.nifi.cluster.manager.PermissionBasedStatusMergerSpec) >>>>>>>>>> Time elapsed: 0.144 sec <<< FAILURE! >>>>>>>>>> org.spockframework.runtime.SpockComparisonFailure: Condition not >>>>>>>>>> satisfied: >>>>>>>>>> >>>>>>>>>> returnedJson == expectedJson >>>>>>>>>> | | | >>>>>>>>>> | | {"id":"hidden","groupId":"hidd >>>>>>>>>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0 >>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 >>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >>>>>>>>>> tasksDuration":"00:00:00.000","activeThreadCount":0} >>>>>>>>>> | false >>>>>>>>>> | 1 difference (99% similarity) >>>>>>>>>> | {"id":"hidden","groupId":"hidd >>>>>>>>>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0 >>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 >>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >>>>>>>>>> tasksDuration":"00:(3)0:00.000","activeThreadCount":0} >>>>>>>>>> | {"id":"hidden","groupId":"hidd >>>>>>>>>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0 >>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 >>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >>>>>>>>>> tasksDuration":"00:(0)0:00.000","activeThreadCount":0} >>>>>>>>>> {"id":"hidden","groupId":"hidden","name":"hidden","type":" >>>>>>>>>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 >>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 >>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0 >>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0"," >>>>>>>>>> tasksDuration":"00:30:00.000","activeThreadCount":0} >>>>>>>>>> >>>>>>>>>> at org.apache.nifi.cluster.manage >>>>>>>>>> r.PermissionBasedStatusMergerSpec.Merge >>>>>>>>>> ProcessorStatusSnapshotDTO(PermissionBasedStatusMergerSpec. >>>>>>>>>> groovy:257) >>>>>>>>>> >>>>>>>>>> Merge ProcessorStatusSnapshotDTO[1]( >>>>>>>>>> org.apache.nifi.cluster.manager.PermissionBasedStatusMergerSpec) >>>>>>>>>> Time elapsed: 0.01 sec <<< FAILURE! >>>>>>>>>> org.spockframework.runtime.SpockComparisonFailure: Condition not >>>>>>>>>> satisfied: >>>>>>>>>> >>>>>>>>>> Tijo >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>> >>
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.nifi.web.api.dto.status; import com.wordnik.swagger.annotations.ApiModelProperty; import javax.xml.bind.annotation.XmlType; /** * DTO for serializing the status of a processor. */ @XmlType(name = "processorStatusSnapshot") public class ProcessorStatusSnapshotDTO implements Cloneable { private String id; private String groupId; private String name; private String type; private String runStatus; private Long bytesRead = 0L; private Long bytesWritten = 0L; private String read; private String written; private Integer flowFilesIn = 0; private Long bytesIn = 0L; private String input; private Integer flowFilesOut = 0; private Long bytesOut = 0L; private String output; private Integer taskCount = 0; private Long tasksDurationNanos = 0L; private String tasks; private String tasksDuration; private Integer activeThreadCount = 0; /* getters / setters */ /** * @return The processor id */ @ApiModelProperty("The id of the processor.") public String getId() { return id; } public void setId(String id) { this.id = id; } /** * @return The processor name */ @ApiModelProperty("The name of the prcessor.") public String getName() { return name; } public void setName(String name) { this.name = name; } /** * @return The processor type */ @ApiModelProperty("The type of the processor.") public String getType() { return type; } public void setType(String type) { this.type = type; } /** * @return run status of this processor */ @ApiModelProperty( value = "The state of the processor.", allowableValues = "RUNNING, STOPPED, DISABLED, INVALID" ) public String getRunStatus() { return runStatus; } public void setRunStatus(String runStatus) { this.runStatus = runStatus; } /** * @return The total count and size of flow files that have been accepted in the last five minutes */ @ApiModelProperty("The count/size of flowfiles that have been accepted in the last 5 minutes.") public String getInput() { return input; } public void setInput(String input) { this.input = input; } /** * @return number of bytes read */ @ApiModelProperty("The number of bytes read in the last 5 minutes.") public String getRead() { return read; } public void setRead(String read) { this.read = read; } /** * @return number of bytes written */ @ApiModelProperty("The number of bytes written in the last 5 minutes.") public String getWritten() { return written; } public void setWritten(String written) { this.written = written; } /** * @return the ID of the Process Group to which this processor belongs. */ @ApiModelProperty("The id of the parent process group to which the processor belongs.") public String getGroupId() { return groupId; } public void setGroupId(final String groupId) { this.groupId = groupId; } /** * @return The total count and size of flow files that have been processed in the last five minutes */ @ApiModelProperty("The count/size of flowfiles that have been processed in the last 5 minutes.") public String getOutput() { return output; } public void setOutput(String output) { this.output = output; } /** * @return number of threads currently running for this Processor */ @ApiModelProperty("The number of threads currently executing in the processor.") public Integer getActiveThreadCount() { return activeThreadCount; } public void setActiveThreadCount(Integer threadCount) { this.activeThreadCount = threadCount; } /** * @return number of task this connectable has had over the last 5 minutes */ @ApiModelProperty("The total number of task this connectable has completed over the last 5 minutes.") public String getTasks() { return tasks; } public void setTasks(String tasks) { this.tasks = tasks; } /** * @return total duration of all tasks for this connectable over the last 5 minutes */ @ApiModelProperty("The total duration of all tasks for this connectable over the last 5 minutes.") public String getTasksDuration() { return tasksDuration; } public void setTasksDuration(String tasksDuration) { this.tasksDuration = tasksDuration; } @ApiModelProperty("The number of bytes read by this Processor in the last 5 mintues") public Long getBytesRead() { return bytesRead; } public void setBytesRead(Long bytesRead) { this.bytesRead = bytesRead; } @ApiModelProperty("The number of bytes written by this Processor in the last 5 minutes") public Long getBytesWritten() { return bytesWritten; } public void setBytesWritten(Long bytesWritten) { this.bytesWritten = bytesWritten; } @ApiModelProperty("The number of FlowFiles that have been accepted in the last 5 minutes") public Integer getFlowFilesIn() { return flowFilesIn; } public void setFlowFilesIn(Integer flowFilesIn) { this.flowFilesIn = flowFilesIn; } @ApiModelProperty("The size of the FlowFiles that have been accepted in the last 5 minutes") public Long getBytesIn() { return bytesIn; } public void setBytesIn(Long bytesIn) { this.bytesIn = bytesIn; } @ApiModelProperty("The number of FlowFiles transferred to a Connection in the last 5 minutes") public Integer getFlowFilesOut() { return flowFilesOut; } public void setFlowFilesOut(Integer flowFilesOut) { this.flowFilesOut = flowFilesOut; } @ApiModelProperty("The size of the FlowFiles transferred to a Connection in the last 5 minutes") public Long getBytesOut() { return bytesOut; } public void setBytesOut(Long bytesOut) { this.bytesOut = bytesOut; } @ApiModelProperty("The number of times this Processor has run in the last 5 minutes") public Integer getTaskCount() { return taskCount; } public void setTaskCount(Integer taskCount) { this.taskCount = taskCount; } @ApiModelProperty("The number of nanoseconds that this Processor has spent running in the last 5 minutes") public Long getTasksDurationNanos() { return tasksDurationNanos; } public void setTasksDurationNanos(Long taskNanos) { this.tasksDurationNanos = taskNanos; } @Override public ProcessorStatusSnapshotDTO clone() { final ProcessorStatusSnapshotDTO other = new ProcessorStatusSnapshotDTO(); other.setId(getId()); other.setGroupId(getGroupId()); other.setName(getName()); other.setType(getType()); other.setRunStatus(getRunStatus()); other.setBytesRead(getBytesRead()); other.setBytesWritten(getBytesWritten()); other.setFlowFilesIn(getFlowFilesIn()); other.setBytesIn(getBytesIn()); other.setFlowFilesOut(getFlowFilesOut()); other.setBytesOut(getBytesOut()); other.setTaskCount(getTaskCount()); other.setTasksDuration(getTasksDuration()); other.setTasksDurationNanos(getTasksDurationNanos()); other.setActiveThreadCount(getActiveThreadCount()); other.setInput(getInput()); other.setOutput(getOutput()); other.setRead(getRead()); other.setWritten(getWritten()); other.setTasks(getTasks()); return other; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.nifi.cluster.manager; import org.apache.nifi.controller.status.RunStatus; import org.apache.nifi.controller.status.TransmissionStatus; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; import org.apache.nifi.web.api.dto.CountersSnapshotDTO; import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO; import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO; import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.StorageUsageDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity; import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; public class StatusMerger { public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) { if (target == null || toMerge == null) { return; } target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued()); target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued()); updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final ControllerStatusDTO target) { target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); } public static void merge(final ProcessGroupStatusDTO target, final boolean targetReadablePermission, final ProcessGroupStatusDTO toMerge, final boolean toMergeReadablePermission, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { if (targetReadablePermission && !toMergeReadablePermission) { target.setId(toMerge.getId()); target.setName(toMerge.getName()); } merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO(); nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); nodeSnapshot.setAddress(nodeAddress); nodeSnapshot.setApiPort(nodeApiPort); nodeSnapshot.setNodeId(nodeId); target.getNodeSnapshots().add(nodeSnapshot); } } public static void merge(final ProcessGroupStatusSnapshotEntity target, ProcessGroupStatusSnapshotEntity toMerge) { if (target == null || toMerge == null) { return; } merge(target.getProcessGroupStatusSnapshot(), target.getCanRead(), toMerge.getProcessGroupStatusSnapshot(), toMerge.getCanRead()); } public static void merge(final ProcessGroupStatusSnapshotDTO target, final boolean targetReadablePermission, final ProcessGroupStatusSnapshotDTO toMerge, final boolean toMergeReadablePermission) { if (target == null || toMerge == null) { return; } if (targetReadablePermission && !toMergeReadablePermission) { target.setId(toMerge.getId()); target.setName(toMerge.getName()); } target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued()); target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued()); target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead()); target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten()); target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred()); target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred()); target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived()); target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived()); target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent()); target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent()); target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); updatePrettyPrintedFields(target); // connection status // sort by id final Map<String, ConnectionStatusSnapshotEntity> mergedConnectionMap = new HashMap<>(); for (final ConnectionStatusSnapshotEntity status : replaceNull(target.getConnectionStatusSnapshots())) { mergedConnectionMap.put(status.getId(), status); } for (final ConnectionStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) { ConnectionStatusSnapshotEntity merged = mergedConnectionMap.get(statusToMerge.getId()); if (merged == null) { mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; } merge(merged, statusToMerge); } target.setConnectionStatusSnapshots(mergedConnectionMap.values()); // processor status final Map<String, ProcessorStatusSnapshotEntity> mergedProcessorMap = new HashMap<>(); for (final ProcessorStatusSnapshotEntity status : replaceNull(target.getProcessorStatusSnapshots())) { mergedProcessorMap.put(status.getId(), status); } for (final ProcessorStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) { ProcessorStatusSnapshotEntity merged = mergedProcessorMap.get(statusToMerge.getId()); if (merged == null) { mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; } merge(merged, statusToMerge); } target.setProcessorStatusSnapshots(mergedProcessorMap.values()); // input ports final Map<String, PortStatusSnapshotEntity> mergedInputPortMap = new HashMap<>(); for (final PortStatusSnapshotEntity status : replaceNull(target.getInputPortStatusSnapshots())) { mergedInputPortMap.put(status.getId(), status); } for (final PortStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) { PortStatusSnapshotEntity merged = mergedInputPortMap.get(statusToMerge.getId()); if (merged == null) { mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; } merge(merged, statusToMerge); } target.setInputPortStatusSnapshots(mergedInputPortMap.values()); // output ports final Map<String, PortStatusSnapshotEntity> mergedOutputPortMap = new HashMap<>(); for (final PortStatusSnapshotEntity status : replaceNull(target.getOutputPortStatusSnapshots())) { mergedOutputPortMap.put(status.getId(), status); } for (final PortStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) { PortStatusSnapshotEntity merged = mergedOutputPortMap.get(statusToMerge.getId()); if (merged == null) { mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; } merge(merged, statusToMerge); } target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); // child groups final Map<String, ProcessGroupStatusSnapshotEntity> mergedGroupMap = new HashMap<>(); for (final ProcessGroupStatusSnapshotEntity status : replaceNull(target.getProcessGroupStatusSnapshots())) { mergedGroupMap.put(status.getId(), status); } for (final ProcessGroupStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) { ProcessGroupStatusSnapshotEntity merged = mergedGroupMap.get(statusToMerge.getId()); if (merged == null) { mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; } merge(merged, statusToMerge); } target.setOutputPortStatusSnapshots(mergedOutputPortMap.values()); // remote groups final Map<String, RemoteProcessGroupStatusSnapshotEntity> mergedRemoteGroupMap = new HashMap<>(); for (final RemoteProcessGroupStatusSnapshotEntity status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) { mergedRemoteGroupMap.put(status.getId(), status); } for (final RemoteProcessGroupStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) { RemoteProcessGroupStatusSnapshotEntity merged = mergedRemoteGroupMap.get(statusToMerge.getId()); if (merged == null) { mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone()); continue; } merge(merged, statusToMerge); } target.setRemoteProcessGroupStatusSnapshots(mergedRemoteGroupMap.values()); } private static <T> Collection<T> replaceNull(final Collection<T> collection) { return (collection == null) ? Collections.<T>emptyList() : collection; } /** * Updates the fields that are "pretty printed" based on the raw values currently set. For example, * {@link ProcessGroupStatusSnapshotDTO#setInput(String)} will be called with the pretty-printed form of the * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusSnapshotDTO#getFlowFilesIn()} and * {@link ProcessGroupStatusSnapshotDTO#getBytesIn()}. * <p> * This logic is performed here, rather than in the DTO itself because the DTO needs to be kept purely * getters & setters - otherwise the automatic marshalling and unmarshalling to/from JSON becomes very * complicated. * * @param target the DTO to update */ public static void updatePrettyPrintedFields(final ProcessGroupStatusSnapshotDTO target) { target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); target.setQueuedCount(formatCount(target.getFlowFilesQueued())); target.setQueuedSize(formatDataSize(target.getBytesQueued())); target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); target.setRead(formatDataSize(target.getBytesRead())); target.setWritten(formatDataSize(target.getBytesWritten())); target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); target.setTransferred(prettyPrint(target.getFlowFilesTransferred(), target.getBytesTransferred())); target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived())); target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent())); } public static void merge(final RemoteProcessGroupStatusDTO target, final boolean targetReadablePermission, final RemoteProcessGroupStatusDTO toMerge, final boolean toMergeReadablePermission, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); target.setTargetUri(toMerge.getTargetUri()); } merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeRemoteProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO(); nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); nodeSnapshot.setAddress(nodeAddress); nodeSnapshot.setApiPort(nodeApiPort); nodeSnapshot.setNodeId(nodeId); target.getNodeSnapshots().add(nodeSnapshot); } } public static void merge(final PortStatusDTO target, final boolean targetReadablePermission, final PortStatusDTO toMerge, final boolean toMergeReadablePermission, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); } merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO(); nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); nodeSnapshot.setAddress(nodeAddress); nodeSnapshot.setApiPort(nodeApiPort); nodeSnapshot.setNodeId(nodeId); target.getNodeSnapshots().add(nodeSnapshot); } } public static void merge(final ConnectionStatusDTO target, final boolean targetReadablePermission, final ConnectionStatusDTO toMerge, final boolean toMergeReadablePermission, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); target.setSourceId(toMerge.getSourceId()); target.setSourceName(toMerge.getSourceName()); target.setDestinationId(toMerge.getDestinationId()); target.setDestinationName(toMerge.getDestinationName()); } merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeConnectionStatusSnapshotDTO nodeSnapshot = new NodeConnectionStatusSnapshotDTO(); nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); nodeSnapshot.setAddress(nodeAddress); nodeSnapshot.setApiPort(nodeApiPort); nodeSnapshot.setNodeId(nodeId); target.getNodeSnapshots().add(nodeSnapshot); } } public static void merge(final ProcessorStatusDTO target, final boolean targetReadablePermission, final ProcessorStatusDTO toMerge, final boolean toMergeReadablePermission, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); target.setType(toMerge.getType()); } merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission); if (target.getNodeSnapshots() != null) { final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO(); nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot()); nodeSnapshot.setAddress(nodeAddress); nodeSnapshot.setApiPort(nodeApiPort); nodeSnapshot.setNodeId(nodeId); target.getNodeSnapshots().add(nodeSnapshot); } } public static void merge(final ProcessorStatusSnapshotEntity target, ProcessorStatusSnapshotEntity toMerge) { if (target == null || toMerge == null) { return; } merge(target.getProcessorStatusSnapshot(), target.getCanRead(), toMerge.getProcessorStatusSnapshot(), toMerge.getCanRead()); } public static void merge(final ProcessorStatusSnapshotDTO target, final boolean targetReadablePermission, final ProcessorStatusSnapshotDTO toMerge, final boolean toMergeReadablePermission) { if (target == null || toMerge == null) { return; } if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); target.setType(toMerge.getType()); } // if the status to merge is invalid allow it to take precedence. whether the // processor run status is disabled/stopped/running is part of the flow configuration // and should not differ amongst nodes. however, whether a processor is invalid // can be driven by environmental conditions. this check allows any of those to // take precedence over the configured run status. if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) { target.setRunStatus(RunStatus.Invalid.name()); } target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead()); target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten()); target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount()); target.setTasksDurationNanos(target.getTasksDurationNanos() + toMerge.getTasksDurationNanos()); target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final ProcessorStatusSnapshotDTO target) { target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); target.setRead(formatDataSize(target.getBytesRead())); target.setWritten(formatDataSize(target.getBytesWritten())); target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); final Integer taskCount = target.getTaskCount(); final String tasks = (taskCount == null) ? "-" : formatCount(taskCount); target.setTasks(tasks); target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTasksDurationNanos(), TimeUnit.NANOSECONDS)); } public static void merge(final ConnectionStatusSnapshotEntity target, ConnectionStatusSnapshotEntity toMerge) { if (target == null || toMerge == null) { return; } merge(target.getConnectionStatusSnapshot(), target.getCanRead(), toMerge.getConnectionStatusSnapshot(), toMerge.getCanRead()); } public static void merge(final ConnectionStatusSnapshotDTO target, final boolean targetReadablePermission, final ConnectionStatusSnapshotDTO toMerge, final boolean toMergeReadablePermission) { if (target == null || toMerge == null) { return; } if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); target.setSourceId(toMerge.getSourceId()); target.setSourceName(toMerge.getSourceName()); target.setDestinationId(toMerge.getDestinationId()); target.setDestinationName(toMerge.getDestinationName()); } target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued()); target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued()); updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) { target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); target.setQueuedCount(formatCount(target.getFlowFilesQueued())); target.setQueuedSize(formatDataSize(target.getBytesQueued())); target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); } public static void merge(final RemoteProcessGroupStatusSnapshotEntity target, RemoteProcessGroupStatusSnapshotEntity toMerge) { if (target == null || toMerge == null) { return; } merge(target.getRemoteProcessGroupStatusSnapshot(), target.getCanRead(), toMerge.getRemoteProcessGroupStatusSnapshot(), toMerge.getCanRead()); } public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final boolean targetReadablePermission, final RemoteProcessGroupStatusSnapshotDTO toMerge, final boolean toMergeReadablePermission) { if (target == null || toMerge == null) { return; } if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); target.setTargetUri(toMerge.getTargetUri()); } final String transmittingValue = TransmissionStatus.Transmitting.name(); if (transmittingValue.equals(target.getTransmissionStatus()) || transmittingValue.equals(toMerge.getTransmissionStatus())) { target.setTransmissionStatus(transmittingValue); } target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent()); target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent()); target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived()); target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived()); updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final RemoteProcessGroupStatusSnapshotDTO target) { target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived())); target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent())); } public static void merge(final PortStatusSnapshotEntity target, PortStatusSnapshotEntity toMerge) { if (target == null || toMerge == null) { return; } merge(target.getPortStatusSnapshot(), target.getCanRead(), toMerge.getPortStatusSnapshot(), toMerge.getCanRead()); } public static void merge(final PortStatusSnapshotDTO target, final boolean targetReadablePermission, final PortStatusSnapshotDTO toMerge, final boolean toMergeReadablePermission) { if (target == null || toMerge == null) { return; } if (targetReadablePermission && !toMergeReadablePermission) { target.setGroupId(toMerge.getGroupId()); target.setId(toMerge.getId()); target.setName(toMerge.getName()); } target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount()); target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn()); target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn()); target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut()); target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut()); target.setTransmitting(Boolean.TRUE.equals(target.isTransmitting()) || Boolean.TRUE.equals(toMerge.isTransmitting())); // should be unnecessary here since ports run status not should be affected by // environmental conditions but doing so in case that changes if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) { target.setRunStatus(RunStatus.Invalid.name()); } updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final PortStatusSnapshotDTO target) { target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn())); target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut())); } public static void merge(final SystemDiagnosticsDTO target, final SystemDiagnosticsDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); List<NodeSystemDiagnosticsSnapshotDTO> nodeSnapshots = target.getNodeSnapshots(); if (nodeSnapshots == null) { nodeSnapshots = new ArrayList<>(); } final NodeSystemDiagnosticsSnapshotDTO nodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO(); nodeSnapshot.setAddress(nodeAddress); nodeSnapshot.setApiPort(nodeApiPort); nodeSnapshot.setNodeId(nodeId); nodeSnapshot.setSnapshot(toMerge.getAggregateSnapshot()); nodeSnapshots.add(nodeSnapshot); target.setNodeSnapshots(nodeSnapshots); } public static void merge(final SystemDiagnosticsSnapshotDTO target, final SystemDiagnosticsSnapshotDTO toMerge) { if (target == null || toMerge == null) { return; } target.setAvailableProcessors(target.getAvailableProcessors() + toMerge.getAvailableProcessors()); target.setDaemonThreads(target.getDaemonThreads() + toMerge.getDaemonThreads()); target.setFreeHeapBytes(target.getFreeHeapBytes() + toMerge.getFreeHeapBytes()); target.setFreeNonHeapBytes(target.getFreeNonHeapBytes() + toMerge.getFreeNonHeapBytes()); target.setMaxHeapBytes(target.getMaxHeapBytes() + toMerge.getMaxHeapBytes()); target.setMaxNonHeapBytes(target.getMaxNonHeapBytes() + toMerge.getMaxNonHeapBytes()); target.setProcessorLoadAverage(target.getProcessorLoadAverage() + toMerge.getProcessorLoadAverage()); target.setTotalHeapBytes(target.getTotalHeapBytes() + toMerge.getTotalHeapBytes()); target.setTotalNonHeapBytes(target.getTotalNonHeapBytes() + toMerge.getTotalNonHeapBytes()); target.setTotalThreads(target.getTotalThreads() + toMerge.getTotalThreads()); target.setUsedHeapBytes(target.getUsedHeapBytes() + toMerge.getUsedHeapBytes()); target.setUsedNonHeapBytes(target.getUsedNonHeapBytes() + toMerge.getUsedNonHeapBytes()); merge(target.getContentRepositoryStorageUsage(), toMerge.getContentRepositoryStorageUsage()); merge(target.getFlowFileRepositoryStorageUsage(), toMerge.getFlowFileRepositoryStorageUsage()); mergeGarbageCollection(target.getGarbageCollection(), toMerge.getGarbageCollection()); updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final SystemDiagnosticsSnapshotDTO target) { // heap target.setMaxHeap(FormatUtils.formatDataSize(target.getMaxHeapBytes())); target.setTotalHeap(FormatUtils.formatDataSize(target.getTotalHeapBytes())); target.setUsedHeap(FormatUtils.formatDataSize(target.getUsedHeapBytes())); target.setFreeHeap(FormatUtils.formatDataSize(target.getFreeHeapBytes())); if (target.getMaxHeapBytes() != -1) { target.setHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedHeapBytes(), target.getMaxHeapBytes()))); } // non heap target.setMaxNonHeap(FormatUtils.formatDataSize(target.getMaxNonHeapBytes())); target.setTotalNonHeap(FormatUtils.formatDataSize(target.getTotalNonHeapBytes())); target.setUsedNonHeap(FormatUtils.formatDataSize(target.getUsedNonHeapBytes())); target.setFreeNonHeap(FormatUtils.formatDataSize(target.getFreeNonHeapBytes())); if (target.getMaxNonHeapBytes() != -1) { target.setNonHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedNonHeapBytes(), target.getMaxNonHeapBytes()))); } } public static void merge(final Set<StorageUsageDTO> targetSet, final Set<StorageUsageDTO> toMerge) { final Map<String, StorageUsageDTO> storageById = new HashMap<>(); for (final StorageUsageDTO targetUsage : targetSet) { storageById.put(targetUsage.getIdentifier(), targetUsage); } for (final StorageUsageDTO usageToMerge : toMerge) { final StorageUsageDTO targetUsage = storageById.get(usageToMerge.getIdentifier()); if (targetUsage == null) { storageById.put(usageToMerge.getIdentifier(), usageToMerge); } else { merge(targetUsage, usageToMerge); } } targetSet.clear(); targetSet.addAll(storageById.values()); } public static void merge(final StorageUsageDTO target, final StorageUsageDTO toMerge) { target.setFreeSpaceBytes(target.getFreeSpaceBytes() + toMerge.getFreeSpaceBytes()); target.setTotalSpaceBytes(target.getTotalSpaceBytes() + toMerge.getTotalSpaceBytes()); target.setUsedSpaceBytes(target.getUsedSpaceBytes() + toMerge.getUsedSpaceBytes()); updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final StorageUsageDTO target) { target.setFreeSpace(FormatUtils.formatDataSize(target.getFreeSpaceBytes())); target.setTotalSpace(FormatUtils.formatDataSize(target.getTotalSpaceBytes())); target.setUsedSpace(FormatUtils.formatDataSize(target.getUsedSpaceBytes())); if (target.getTotalSpaceBytes() != -1) { target.setUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedSpaceBytes(), target.getTotalSpaceBytes()))); } } public static void mergeGarbageCollection(final Set<GarbageCollectionDTO> targetSet, final Set<GarbageCollectionDTO> toMerge) { final Map<String, GarbageCollectionDTO> storageById = new HashMap<>(); for (final GarbageCollectionDTO targetUsage : targetSet) { storageById.put(targetUsage.getName(), targetUsage); } for (final GarbageCollectionDTO usageToMerge : toMerge) { final GarbageCollectionDTO targetUsage = storageById.get(usageToMerge.getName()); if (targetUsage == null) { storageById.put(usageToMerge.getName(), usageToMerge); } else { merge(targetUsage, usageToMerge); } } targetSet.clear(); targetSet.addAll(storageById.values()); } public static void merge(final GarbageCollectionDTO target, final GarbageCollectionDTO toMerge) { target.setCollectionCount(target.getCollectionCount() + toMerge.getCollectionCount()); target.setCollectionMillis(target.getCollectionMillis() + toMerge.getCollectionMillis()); updatePrettyPrintedFields(target); } public static void updatePrettyPrintedFields(final GarbageCollectionDTO target) { target.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(target.getCollectionMillis(), TimeUnit.MILLISECONDS)); } public static void merge(final CountersDTO target, final CountersDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) { merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot()); List<NodeCountersSnapshotDTO> nodeSnapshots = target.getNodeSnapshots(); if (nodeSnapshots == null) { nodeSnapshots = new ArrayList<>(); } final NodeCountersSnapshotDTO nodeCountersSnapshot = new NodeCountersSnapshotDTO(); nodeCountersSnapshot.setNodeId(nodeId); nodeCountersSnapshot.setAddress(nodeAddress); nodeCountersSnapshot.setApiPort(nodeApiPort); nodeCountersSnapshot.setSnapshot(toMerge.getAggregateSnapshot()); nodeSnapshots.add(nodeCountersSnapshot); target.setNodeSnapshots(nodeSnapshots); } public static void merge(final CountersSnapshotDTO target, final CountersSnapshotDTO toMerge) { final Map<String, CounterDTO> counters = new HashMap<>(); for (final CounterDTO counter : target.getCounters()) { counters.put(counter.getId(), counter); } for (final CounterDTO counter : toMerge.getCounters()) { final CounterDTO existing = counters.get(counter.getId()); if (existing == null) { counters.put(counter.getId(), counter); } else { merge(existing, counter); } } target.setCounters(counters.values()); } public static void merge(final CounterDTO target, final CounterDTO toMerge) { target.setValueCount(target.getValueCount() + toMerge.getValueCount()); target.setValue(FormatUtils.formatCount(target.getValueCount())); } public static int getUtilization(final double used, final double total) { return (int) Math.round((used / total) * 100); } public static String formatCount(final Integer intStatus) { return intStatus == null ? "-" : FormatUtils.formatCount(intStatus); } public static String formatDataSize(final Long longStatus) { return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus); } public static String prettyPrint(final Integer count, final Long bytes) { return formatCount(count) + " (" + formatDataSize(bytes) + ")"; } }
