Hi Jeff,

Yes,  I took a fresh clone from git hub .  I cleaned my maven repo as well
before building .

 Attached StatusMerger.java and ProcessorStatusSnapshotDTO.java

All,

It will be great if others also check if it is happening when u build. I am
worried whether I am doing some thing really stupid.

Thanks & Regards

Tijo Thomas

On 16-Sep-2016 7:47 pm, "Jeff" <jtsw...@gmail.com> wrote:

> Thank you for the information.  Did you try running the tests on a fresh
> clone of the github repo?
>
> Could you please link me to or include the contents of StatusMerger.java
> and ProcessorStatusSnapshotDTO.java?
>
> On Fri, Sep 16, 2016 at 4:17 AM Tijo Thomas <tijopara...@gmail.com> wrote:
>
>>
>> Output when I ran through IDE,
>>
>>
>> Condition not satisfied:
>>
>> returnedJson == expectedJson
>> |            |  |
>> |            |  {"id":"hidden","groupId":"hidd
>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0
>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:00:00.000","activeThreadCount":0}
>> |            false
>> |            1 difference (99% similarity)
>> |            {"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:(3)0:00.000","activeThreadCount":0}
>> |            {"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:(0)0:00.000","activeThreadCount":0}
>> {"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:30:00.000","activeThreadCount":0}
>>
>> Expected :{"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:00:00.000","activeThreadCount":0}
>>
>> Actual   :{"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:30:00.000","activeThreadCount":0}
>>
>>
>> Taskduration is showing 30 min different when compared to expected and
>> actuals.
>>
>>     at org.apache.nifi.cluster.manager.PermissionBasedStatusMergerSpec.Merge
>> ProcessorStatusSnapshotDTO(PermissionBasedStatusMergerSpec.groovy:257)
>>
>>
>>
>> Condition not satisfied:
>>
>> returnedJson == expectedJson
>> |            |  |
>> |            |  {"id":"hidden","groupId":"hidd
>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0
>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:00:00.000","activeThreadCount":0}
>> |            false
>> |            1 difference (99% similarity)
>> |            {"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:(3)0:00.000","activeThreadCount":0}
>> |            {"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:(0)0:00.000","activeThreadCount":0}
>> {"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:30:00.000","activeThreadCount":0}
>>
>> Expected :{"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:00:00.000","activeThreadCount":0}
>>
>> Actual   :{"id":"hidden","groupId":"hidden","name":"hidden","type":"
>> hidden","bytesRead":0,"bytesWritten":0,"read":"0 bytes","written":"0
>> bytes","flowFilesIn":0,"bytesIn":0,"input":"0 (0
>> bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>> tasksDuration":"00:30:00.000","activeThreadCount":0}
>>
>>
>> mvn --version
>>
>> Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5;
>> 2015-11-10T22:11:47+05:30)
>> Maven home: /home/tijo/apache-maven-3.3.9
>> Java version: 1.8.0_91, vendor: Oracle Corporation
>> Java home: /home/tijo/jdk1.8.0_91/jre
>> Default locale: en_IN, platform encoding: UTF-8
>> OS name: "linux", version: "3.13.0-95-generic", arch: "amd64", family:
>> "unix"
>>
>>
>> Tijo
>>
>>
>>
>> On Thu, Sep 15, 2016 at 8:07 PM, Jeff <jtsw...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Could you try building a fresh clone of the NIFI repo?  What is your dev
>>> environment?  What versions of java and maven are you running?
>>>
>>> Does PermissionBasedStatusMergerSpec work when you run it directly via
>>> maven and/or an IDE?
>>>
>>> On Wed, Sep 14, 2016 at 2:37 PM Tijo Thomas <tijopara...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jeff,
>>>>
>>>> I tried replacing the mvn repo and  did  mvn clean package.
>>>>
>>>> I am still getting this error.
>>>>
>>>> I am sorry I am not getting any clue why it is failing.
>>>>
>>>> Can you check again.
>>>>
>>>> Tijo
>>>>
>>>> On Wed, Sep 14, 2016 at 9:27 PM, Jeff <jtsw...@gmail.com> wrote:
>>>>
>>>>> Ok, sounds good!  Please let us know!
>>>>>
>>>>> On Wed, Sep 14, 2016 at 12:04 AM Tijo Thomas <tijopara...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry to reply  late. I was on vacation for last 4 days.
>>>>>>
>>>>>> I have not modified any files.
>>>>>>
>>>>>> I think there is some problem with my repo.  I will make a new repo
>>>>>> and try again.  Still the problem  exist I will post it again in the 
>>>>>> group.
>>>>>>
>>>>>> Thank you very much for your support.
>>>>>>
>>>>>> Tijo
>>>>>>
>>>>>> On 10-Sep-2016 6:44 pm, "Jeff" <jtsw...@gmail.com> wrote:
>>>>>>
>>>>>>> Tijo,
>>>>>>>
>>>>>>> Have you modified ProcessorStatusSnapshotDTO.java or
>>>>>>> PermissionBasedStatusMergerSpec.groovy?
>>>>>>>
>>>>>>> On Sat, Sep 10, 2016 at 7:48 AM Tijo Thomas <tijopara...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jeff
>>>>>>>>
>>>>>>>> I recently  rebase  from master.
>>>>>>>> Then I cloned again and ran mvn  package
>>>>>>>>
>>>>>>>> Tijo
>>>>>>>>
>>>>>>>> On 09-Sep-2016 9:12 pm, "Jeff" <jtsw...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Tijo,
>>>>>>>>>
>>>>>>>>> I just ran this test on master and it's passing for me.  Can you
>>>>>>>>> provide some details about the branch you're on when running the 
>>>>>>>>> tests?  I
>>>>>>>>> see that tasksDuration is 00:30:00.000 when it's expecting 
>>>>>>>>> 00:00:00.000,
>>>>>>>>> and that's why the JSON isn't matching.
>>>>>>>>>
>>>>>>>>> On Thu, Sep 8, 2016 at 4:58 PM Tijo Thomas <tijopara...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>> Nifi test case is failing (PermissionBasedStatusMergerSpec) .
>>>>>>>>>> This is written in Grovy .. not comfortable with Groovy .
>>>>>>>>>>
>>>>>>>>>> Running org.apache.nifi.cluster.manage
>>>>>>>>>> r.PermissionBasedStatusMergerSpec
>>>>>>>>>> Tests run: 20, Failures: 2, Errors: 0, Skipped: 0, Time elapsed:
>>>>>>>>>> 0.922 sec <<< FAILURE! - in org.apache.nifi.cluster.manage
>>>>>>>>>> r.PermissionBasedStatusMergerSpec
>>>>>>>>>> Merge ProcessorStatusSnapshotDTO[0](
>>>>>>>>>> org.apache.nifi.cluster.manager.PermissionBasedStatusMergerSpec)
>>>>>>>>>> Time elapsed: 0.144 sec  <<< FAILURE!
>>>>>>>>>> org.spockframework.runtime.SpockComparisonFailure: Condition not
>>>>>>>>>> satisfied:
>>>>>>>>>>
>>>>>>>>>> returnedJson == expectedJson
>>>>>>>>>> |            |  |
>>>>>>>>>> |            |  {"id":"hidden","groupId":"hidd
>>>>>>>>>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0
>>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0
>>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>>>>>>>>>> tasksDuration":"00:00:00.000","activeThreadCount":0}
>>>>>>>>>> |            false
>>>>>>>>>> |            1 difference (99% similarity)
>>>>>>>>>> |            {"id":"hidden","groupId":"hidd
>>>>>>>>>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0
>>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0
>>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>>>>>>>>>> tasksDuration":"00:(3)0:00.000","activeThreadCount":0}
>>>>>>>>>> |            {"id":"hidden","groupId":"hidd
>>>>>>>>>> en","name":"hidden","type":"hidden","bytesRead":0,"bytesWritten":0,"read":"0
>>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0
>>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>>>>>>>>>> tasksDuration":"00:(0)0:00.000","activeThreadCount":0}
>>>>>>>>>> {"id":"hidden","groupId":"hidden","name":"hidden","type":"
>>>>>>>>>> hidden","bytesRead":0,"bytesWritten":0,"read":"0
>>>>>>>>>> bytes","written":"0 bytes","flowFilesIn":0,"bytesIn":0,"input":"0
>>>>>>>>>> (0 bytes)","flowFilesOut":0,"bytesOut":0,"output":"0 (0
>>>>>>>>>> bytes)","taskCount":0,"tasksDurationNanos":0,"tasks":"0","
>>>>>>>>>> tasksDuration":"00:30:00.000","activeThreadCount":0}
>>>>>>>>>>
>>>>>>>>>>     at org.apache.nifi.cluster.manage
>>>>>>>>>> r.PermissionBasedStatusMergerSpec.Merge
>>>>>>>>>> ProcessorStatusSnapshotDTO(PermissionBasedStatusMergerSpec.
>>>>>>>>>> groovy:257)
>>>>>>>>>>
>>>>>>>>>> Merge ProcessorStatusSnapshotDTO[1](
>>>>>>>>>> org.apache.nifi.cluster.manager.PermissionBasedStatusMergerSpec)
>>>>>>>>>> Time elapsed: 0.01 sec  <<< FAILURE!
>>>>>>>>>> org.spockframework.runtime.SpockComparisonFailure: Condition not
>>>>>>>>>> satisfied:
>>>>>>>>>>
>>>>>>>>>> Tijo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>
>>
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.web.api.dto.status;

import com.wordnik.swagger.annotations.ApiModelProperty;

import javax.xml.bind.annotation.XmlType;

/**
 * DTO for serializing the status of a processor.
 */
@XmlType(name = "processorStatusSnapshot")
public class ProcessorStatusSnapshotDTO implements Cloneable {

    private String id;
    private String groupId;
    private String name;
    private String type;
    private String runStatus;

    private Long bytesRead = 0L;
    private Long bytesWritten = 0L;
    private String read;
    private String written;

    private Integer flowFilesIn = 0;
    private Long bytesIn = 0L;
    private String input;

    private Integer flowFilesOut = 0;
    private Long bytesOut = 0L;
    private String output;

    private Integer taskCount = 0;
    private Long tasksDurationNanos = 0L;
    private String tasks;
    private String tasksDuration;
    private Integer activeThreadCount = 0;

    /* getters / setters */
    /**
     * @return The processor id
     */
    @ApiModelProperty("The id of the processor.")
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    /**
     * @return The processor name
     */
    @ApiModelProperty("The name of the prcessor.")
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    /**
     * @return The processor type
     */
    @ApiModelProperty("The type of the processor.")
    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    /**
     * @return run status of this processor
     */
    @ApiModelProperty(
            value = "The state of the processor.",
            allowableValues = "RUNNING, STOPPED, DISABLED, INVALID"
    )
    public String getRunStatus() {
        return runStatus;
    }

    public void setRunStatus(String runStatus) {
        this.runStatus = runStatus;
    }

    /**
     * @return The total count and size of flow files that have been accepted in the last five minutes
     */
    @ApiModelProperty("The count/size of flowfiles that have been accepted in the last 5 minutes.")
    public String getInput() {
        return input;
    }

    public void setInput(String input) {
        this.input = input;
    }

    /**
     * @return number of bytes read
     */
    @ApiModelProperty("The number of bytes read in the last 5 minutes.")
    public String getRead() {
        return read;
    }

    public void setRead(String read) {
        this.read = read;
    }

    /**
     * @return number of bytes written
     */
    @ApiModelProperty("The number of bytes written in the last 5 minutes.")
    public String getWritten() {
        return written;
    }

    public void setWritten(String written) {
        this.written = written;
    }

    /**
     * @return the ID of the Process Group to which this processor belongs.
     */
    @ApiModelProperty("The id of the parent process group to which the processor belongs.")
    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(final String groupId) {
        this.groupId = groupId;
    }

    /**
     * @return The total count and size of flow files that have been processed in the last five minutes
     */
    @ApiModelProperty("The count/size of flowfiles that have been processed in the last 5 minutes.")
    public String getOutput() {
        return output;
    }

    public void setOutput(String output) {
        this.output = output;
    }

    /**
     * @return number of threads currently running for this Processor
     */
    @ApiModelProperty("The number of threads currently executing in the processor.")
    public Integer getActiveThreadCount() {
        return activeThreadCount;
    }

    public void setActiveThreadCount(Integer threadCount) {
        this.activeThreadCount = threadCount;
    }

    /**
     * @return number of task this connectable has had over the last 5 minutes
     */
    @ApiModelProperty("The total number of task this connectable has completed over the last 5 minutes.")
    public String getTasks() {
        return tasks;
    }

    public void setTasks(String tasks) {
        this.tasks = tasks;
    }

    /**
     * @return total duration of all tasks for this connectable over the last 5 minutes
     */
    @ApiModelProperty("The total duration of all tasks for this connectable over the last 5 minutes.")
    public String getTasksDuration() {
        return tasksDuration;
    }

    public void setTasksDuration(String tasksDuration) {
        this.tasksDuration = tasksDuration;
    }

    @ApiModelProperty("The number of bytes read by this Processor in the last 5 mintues")
    public Long getBytesRead() {
        return bytesRead;
    }

    public void setBytesRead(Long bytesRead) {
        this.bytesRead = bytesRead;
    }

    @ApiModelProperty("The number of bytes written by this Processor in the last 5 minutes")
    public Long getBytesWritten() {
        return bytesWritten;
    }

    public void setBytesWritten(Long bytesWritten) {
        this.bytesWritten = bytesWritten;
    }

    @ApiModelProperty("The number of FlowFiles that have been accepted in the last 5 minutes")
    public Integer getFlowFilesIn() {
        return flowFilesIn;
    }

    public void setFlowFilesIn(Integer flowFilesIn) {
        this.flowFilesIn = flowFilesIn;
    }

    @ApiModelProperty("The size of the FlowFiles that have been accepted in the last 5 minutes")
    public Long getBytesIn() {
        return bytesIn;
    }

    public void setBytesIn(Long bytesIn) {
        this.bytesIn = bytesIn;
    }

    @ApiModelProperty("The number of FlowFiles transferred to a Connection in the last 5 minutes")
    public Integer getFlowFilesOut() {
        return flowFilesOut;
    }

    public void setFlowFilesOut(Integer flowFilesOut) {
        this.flowFilesOut = flowFilesOut;
    }

    @ApiModelProperty("The size of the FlowFiles transferred to a Connection in the last 5 minutes")
    public Long getBytesOut() {
        return bytesOut;
    }

    public void setBytesOut(Long bytesOut) {
        this.bytesOut = bytesOut;
    }

    @ApiModelProperty("The number of times this Processor has run in the last 5 minutes")
    public Integer getTaskCount() {
        return taskCount;
    }

    public void setTaskCount(Integer taskCount) {
        this.taskCount = taskCount;
    }

    @ApiModelProperty("The number of nanoseconds that this Processor has spent running in the last 5 minutes")
    public Long getTasksDurationNanos() {
        return tasksDurationNanos;
    }

    public void setTasksDurationNanos(Long taskNanos) {
        this.tasksDurationNanos = taskNanos;
    }

    @Override
    public ProcessorStatusSnapshotDTO clone() {
        final ProcessorStatusSnapshotDTO other = new ProcessorStatusSnapshotDTO();
        other.setId(getId());
        other.setGroupId(getGroupId());
        other.setName(getName());
        other.setType(getType());

        other.setRunStatus(getRunStatus());
        other.setBytesRead(getBytesRead());
        other.setBytesWritten(getBytesWritten());
        other.setFlowFilesIn(getFlowFilesIn());
        other.setBytesIn(getBytesIn());
        other.setFlowFilesOut(getFlowFilesOut());
        other.setBytesOut(getBytesOut());
        other.setTaskCount(getTaskCount());
        other.setTasksDuration(getTasksDuration());
        other.setTasksDurationNanos(getTasksDurationNanos());
        other.setActiveThreadCount(getActiveThreadCount());
        other.setInput(getInput());
        other.setOutput(getOutput());
        other.setRead(getRead());
        other.setWritten(getWritten());
        other.setTasks(getTasks());

        return other;
    }
}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.nifi.cluster.manager;

import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.NodeCountersSnapshotDTO;
import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.StorageUsageDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.NodePortStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessorStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.PortStatusDTO;
import org.apache.nifi.web.api.dto.status.PortStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class StatusMerger {
    public static void merge(final ControllerStatusDTO target, final ControllerStatusDTO toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
        target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
        target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());

        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final ControllerStatusDTO target) {
        target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
    }

    public static void merge(final ProcessGroupStatusDTO target, final boolean targetReadablePermission, final ProcessGroupStatusDTO toMerge, final boolean toMergeReadablePermission,
                             final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
        }

        merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission);

        if (target.getNodeSnapshots() != null) {
            final NodeProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeProcessGroupStatusSnapshotDTO();
            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
            nodeSnapshot.setAddress(nodeAddress);
            nodeSnapshot.setApiPort(nodeApiPort);
            nodeSnapshot.setNodeId(nodeId);

            target.getNodeSnapshots().add(nodeSnapshot);
        }
    }

    public static void merge(final ProcessGroupStatusSnapshotEntity target, ProcessGroupStatusSnapshotEntity toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        merge(target.getProcessGroupStatusSnapshot(), target.getCanRead(), toMerge.getProcessGroupStatusSnapshot(), toMerge.getCanRead());
    }

    public static void merge(final ProcessGroupStatusSnapshotDTO target, final boolean targetReadablePermission, final ProcessGroupStatusSnapshotDTO toMerge,
                             final boolean toMergeReadablePermission) {
        if (target == null || toMerge == null) {
            return;
        }

        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
        }

        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());

        target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
        target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());

        target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
        target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());

        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());

        target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
        target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());

        target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
        target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());

        target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
        target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());

        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
        updatePrettyPrintedFields(target);

        // connection status
        // sort by id
        final Map<String, ConnectionStatusSnapshotEntity> mergedConnectionMap = new HashMap<>();
        for (final ConnectionStatusSnapshotEntity status : replaceNull(target.getConnectionStatusSnapshots())) {
            mergedConnectionMap.put(status.getId(), status);
        }

        for (final ConnectionStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getConnectionStatusSnapshots())) {
            ConnectionStatusSnapshotEntity merged = mergedConnectionMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedConnectionMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merge(merged, statusToMerge);
        }
        target.setConnectionStatusSnapshots(mergedConnectionMap.values());

        // processor status
        final Map<String, ProcessorStatusSnapshotEntity> mergedProcessorMap = new HashMap<>();
        for (final ProcessorStatusSnapshotEntity status : replaceNull(target.getProcessorStatusSnapshots())) {
            mergedProcessorMap.put(status.getId(), status);
        }

        for (final ProcessorStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getProcessorStatusSnapshots())) {
            ProcessorStatusSnapshotEntity merged = mergedProcessorMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedProcessorMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merge(merged, statusToMerge);
        }
        target.setProcessorStatusSnapshots(mergedProcessorMap.values());


        // input ports
        final Map<String, PortStatusSnapshotEntity> mergedInputPortMap = new HashMap<>();
        for (final PortStatusSnapshotEntity status : replaceNull(target.getInputPortStatusSnapshots())) {
            mergedInputPortMap.put(status.getId(), status);
        }

        for (final PortStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getInputPortStatusSnapshots())) {
            PortStatusSnapshotEntity merged = mergedInputPortMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedInputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merge(merged, statusToMerge);
        }
        target.setInputPortStatusSnapshots(mergedInputPortMap.values());

        // output ports
        final Map<String, PortStatusSnapshotEntity> mergedOutputPortMap = new HashMap<>();
        for (final PortStatusSnapshotEntity status : replaceNull(target.getOutputPortStatusSnapshots())) {
            mergedOutputPortMap.put(status.getId(), status);
        }

        for (final PortStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getOutputPortStatusSnapshots())) {
            PortStatusSnapshotEntity merged = mergedOutputPortMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedOutputPortMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merge(merged, statusToMerge);
        }
        target.setOutputPortStatusSnapshots(mergedOutputPortMap.values());

        // child groups
        final Map<String, ProcessGroupStatusSnapshotEntity> mergedGroupMap = new HashMap<>();
        for (final ProcessGroupStatusSnapshotEntity status : replaceNull(target.getProcessGroupStatusSnapshots())) {
            mergedGroupMap.put(status.getId(), status);
        }

        for (final ProcessGroupStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getProcessGroupStatusSnapshots())) {
            ProcessGroupStatusSnapshotEntity merged = mergedGroupMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merge(merged, statusToMerge);
        }
        target.setOutputPortStatusSnapshots(mergedOutputPortMap.values());

        // remote groups
        final Map<String, RemoteProcessGroupStatusSnapshotEntity> mergedRemoteGroupMap = new HashMap<>();
        for (final RemoteProcessGroupStatusSnapshotEntity status : replaceNull(target.getRemoteProcessGroupStatusSnapshots())) {
            mergedRemoteGroupMap.put(status.getId(), status);
        }

        for (final RemoteProcessGroupStatusSnapshotEntity statusToMerge : replaceNull(toMerge.getRemoteProcessGroupStatusSnapshots())) {
            RemoteProcessGroupStatusSnapshotEntity merged = mergedRemoteGroupMap.get(statusToMerge.getId());
            if (merged == null) {
                mergedRemoteGroupMap.put(statusToMerge.getId(), statusToMerge.clone());
                continue;
            }

            merge(merged, statusToMerge);
        }
        target.setRemoteProcessGroupStatusSnapshots(mergedRemoteGroupMap.values());
    }

    private static <T> Collection<T> replaceNull(final Collection<T> collection) {
        return (collection == null) ? Collections.<T>emptyList() : collection;
    }


    /**
     * Updates the fields that are "pretty printed" based on the raw values currently set. For example,
     * {@link ProcessGroupStatusSnapshotDTO#setInput(String)} will be called with the pretty-printed form of the
     * FlowFile counts and sizes retrieved via {@link ProcessGroupStatusSnapshotDTO#getFlowFilesIn()} and
     * {@link ProcessGroupStatusSnapshotDTO#getBytesIn()}.
     * <p>
     * This logic is performed here, rather than in the DTO itself because the DTO needs to be kept purely
     * getters & setters - otherwise the automatic marshalling and unmarshalling to/from JSON becomes very
     * complicated.
     *
     * @param target the DTO to update
     */
    public static void updatePrettyPrintedFields(final ProcessGroupStatusSnapshotDTO target) {
        target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
        target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
        target.setQueuedSize(formatDataSize(target.getBytesQueued()));
        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
        target.setRead(formatDataSize(target.getBytesRead()));
        target.setWritten(formatDataSize(target.getBytesWritten()));
        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
        target.setTransferred(prettyPrint(target.getFlowFilesTransferred(), target.getBytesTransferred()));
        target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived()));
        target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent()));
    }

    public static void merge(final RemoteProcessGroupStatusDTO target, final boolean targetReadablePermission, final RemoteProcessGroupStatusDTO toMerge,
                             final boolean toMergeReadablePermission, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
            target.setTargetUri(toMerge.getTargetUri());
        }

        merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission);

        if (target.getNodeSnapshots() != null) {
            final NodeRemoteProcessGroupStatusSnapshotDTO nodeSnapshot = new NodeRemoteProcessGroupStatusSnapshotDTO();
            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
            nodeSnapshot.setAddress(nodeAddress);
            nodeSnapshot.setApiPort(nodeApiPort);
            nodeSnapshot.setNodeId(nodeId);

            target.getNodeSnapshots().add(nodeSnapshot);
        }
    }

    public static void merge(final PortStatusDTO target, final boolean targetReadablePermission, final PortStatusDTO toMerge, final boolean toMergeReadablePermission, final String nodeId,
                             final String nodeAddress, final Integer nodeApiPort) {
        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
        }

        merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission);

        if (target.getNodeSnapshots() != null) {
            final NodePortStatusSnapshotDTO nodeSnapshot = new NodePortStatusSnapshotDTO();
            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
            nodeSnapshot.setAddress(nodeAddress);
            nodeSnapshot.setApiPort(nodeApiPort);
            nodeSnapshot.setNodeId(nodeId);

            target.getNodeSnapshots().add(nodeSnapshot);
        }
    }

    public static void merge(final ConnectionStatusDTO target, final boolean targetReadablePermission, final ConnectionStatusDTO toMerge, final boolean toMergeReadablePermission,
                             final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
            target.setSourceId(toMerge.getSourceId());
            target.setSourceName(toMerge.getSourceName());
            target.setDestinationId(toMerge.getDestinationId());
            target.setDestinationName(toMerge.getDestinationName());
        }

        merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission);

        if (target.getNodeSnapshots() != null) {
            final NodeConnectionStatusSnapshotDTO nodeSnapshot = new NodeConnectionStatusSnapshotDTO();
            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
            nodeSnapshot.setAddress(nodeAddress);
            nodeSnapshot.setApiPort(nodeApiPort);
            nodeSnapshot.setNodeId(nodeId);

            target.getNodeSnapshots().add(nodeSnapshot);
        }
    }

    public static void merge(final ProcessorStatusDTO target, final boolean targetReadablePermission, final ProcessorStatusDTO toMerge, final boolean toMergeReadablePermission,
                             final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
            target.setType(toMerge.getType());
        }

        merge(target.getAggregateSnapshot(), targetReadablePermission, toMerge.getAggregateSnapshot(), toMergeReadablePermission);

        if (target.getNodeSnapshots() != null) {
            final NodeProcessorStatusSnapshotDTO nodeSnapshot = new NodeProcessorStatusSnapshotDTO();
            nodeSnapshot.setStatusSnapshot(toMerge.getAggregateSnapshot());
            nodeSnapshot.setAddress(nodeAddress);
            nodeSnapshot.setApiPort(nodeApiPort);
            nodeSnapshot.setNodeId(nodeId);

            target.getNodeSnapshots().add(nodeSnapshot);
        }
    }

    public static void merge(final ProcessorStatusSnapshotEntity target, ProcessorStatusSnapshotEntity toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        merge(target.getProcessorStatusSnapshot(), target.getCanRead(), toMerge.getProcessorStatusSnapshot(), toMerge.getCanRead());
    }

    public static void merge(final ProcessorStatusSnapshotDTO target, final boolean targetReadablePermission, final ProcessorStatusSnapshotDTO toMerge,
                             final boolean toMergeReadablePermission) {
        if (target == null || toMerge == null) {
            return;
        }

        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
            target.setType(toMerge.getType());
        }

        // if the status to merge is invalid allow it to take precedence. whether the
        // processor run status is disabled/stopped/running is part of the flow configuration
        // and should not differ amongst nodes. however, whether a processor is invalid
        // can be driven by environmental conditions. this check allows any of those to
        // take precedence over the configured run status.
        if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) {
            target.setRunStatus(RunStatus.Invalid.name());
        }

        target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
        target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
        target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount());
        target.setTasksDurationNanos(target.getTasksDurationNanos() + toMerge.getTasksDurationNanos());
        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final ProcessorStatusSnapshotDTO target) {
        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
        target.setRead(formatDataSize(target.getBytesRead()));
        target.setWritten(formatDataSize(target.getBytesWritten()));
        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));

        final Integer taskCount = target.getTaskCount();
        final String tasks = (taskCount == null) ? "-" : formatCount(taskCount);
        target.setTasks(tasks);

        target.setTasksDuration(FormatUtils.formatHoursMinutesSeconds(target.getTasksDurationNanos(), TimeUnit.NANOSECONDS));
    }

    public static void merge(final ConnectionStatusSnapshotEntity target, ConnectionStatusSnapshotEntity toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        merge(target.getConnectionStatusSnapshot(), target.getCanRead(), toMerge.getConnectionStatusSnapshot(), toMerge.getCanRead());
    }

    public static void merge(final ConnectionStatusSnapshotDTO target, final boolean targetReadablePermission, final ConnectionStatusSnapshotDTO toMerge,
                             final boolean toMergeReadablePermission) {
        if (target == null || toMerge == null) {
            return;
        }

        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
            target.setSourceId(toMerge.getSourceId());
            target.setSourceName(toMerge.getSourceName());
            target.setDestinationId(toMerge.getDestinationId());
            target.setDestinationName(toMerge.getDestinationName());
        }

        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
        target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
        target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) {
        target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
        target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
        target.setQueuedSize(formatDataSize(target.getBytesQueued()));
        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
    }


    public static void merge(final RemoteProcessGroupStatusSnapshotEntity target, RemoteProcessGroupStatusSnapshotEntity toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        merge(target.getRemoteProcessGroupStatusSnapshot(), target.getCanRead(), toMerge.getRemoteProcessGroupStatusSnapshot(), toMerge.getCanRead());
    }

    public static void merge(final RemoteProcessGroupStatusSnapshotDTO target, final boolean targetReadablePermission, final RemoteProcessGroupStatusSnapshotDTO toMerge,
                             final boolean toMergeReadablePermission) {
        if (target == null || toMerge == null) {
            return;
        }

        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
            target.setTargetUri(toMerge.getTargetUri());
        }

        final String transmittingValue = TransmissionStatus.Transmitting.name();
        if (transmittingValue.equals(target.getTransmissionStatus()) || transmittingValue.equals(toMerge.getTransmissionStatus())) {
            target.setTransmissionStatus(transmittingValue);
        }

        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());

        target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
        target.setBytesSent(target.getBytesSent() + toMerge.getBytesSent());
        target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
        target.setBytesReceived(target.getBytesReceived() + toMerge.getBytesReceived());
        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final RemoteProcessGroupStatusSnapshotDTO target) {
        target.setReceived(prettyPrint(target.getFlowFilesReceived(), target.getBytesReceived()));
        target.setSent(prettyPrint(target.getFlowFilesSent(), target.getBytesSent()));
    }


    public static void merge(final PortStatusSnapshotEntity target, PortStatusSnapshotEntity toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        merge(target.getPortStatusSnapshot(), target.getCanRead(), toMerge.getPortStatusSnapshot(), toMerge.getCanRead());
    }

    public static void merge(final PortStatusSnapshotDTO target, final boolean targetReadablePermission, final PortStatusSnapshotDTO toMerge, final boolean toMergeReadablePermission) {
        if (target == null || toMerge == null) {
            return;
        }

        if (targetReadablePermission && !toMergeReadablePermission) {
            target.setGroupId(toMerge.getGroupId());
            target.setId(toMerge.getId());
            target.setName(toMerge.getName());
        }

        target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
        target.setFlowFilesIn(target.getFlowFilesIn() + toMerge.getFlowFilesIn());
        target.setBytesIn(target.getBytesIn() + toMerge.getBytesIn());
        target.setFlowFilesOut(target.getFlowFilesOut() + toMerge.getFlowFilesOut());
        target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
        target.setTransmitting(Boolean.TRUE.equals(target.isTransmitting()) || Boolean.TRUE.equals(toMerge.isTransmitting()));

        // should be unnecessary here since ports run status not should be affected by
        // environmental conditions but doing so in case that changes
        if (RunStatus.Invalid.name().equals(toMerge.getRunStatus())) {
            target.setRunStatus(RunStatus.Invalid.name());
        }

        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final PortStatusSnapshotDTO target) {
        target.setInput(prettyPrint(target.getFlowFilesIn(), target.getBytesIn()));
        target.setOutput(prettyPrint(target.getFlowFilesOut(), target.getBytesOut()));
    }


    public static void merge(final SystemDiagnosticsDTO target, final SystemDiagnosticsDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());

        List<NodeSystemDiagnosticsSnapshotDTO> nodeSnapshots = target.getNodeSnapshots();
        if (nodeSnapshots == null) {
            nodeSnapshots = new ArrayList<>();
        }

        final NodeSystemDiagnosticsSnapshotDTO nodeSnapshot = new NodeSystemDiagnosticsSnapshotDTO();
        nodeSnapshot.setAddress(nodeAddress);
        nodeSnapshot.setApiPort(nodeApiPort);
        nodeSnapshot.setNodeId(nodeId);
        nodeSnapshot.setSnapshot(toMerge.getAggregateSnapshot());

        nodeSnapshots.add(nodeSnapshot);
        target.setNodeSnapshots(nodeSnapshots);
    }

    public static void merge(final SystemDiagnosticsSnapshotDTO target, final SystemDiagnosticsSnapshotDTO toMerge) {
        if (target == null || toMerge == null) {
            return;
        }

        target.setAvailableProcessors(target.getAvailableProcessors() + toMerge.getAvailableProcessors());
        target.setDaemonThreads(target.getDaemonThreads() + toMerge.getDaemonThreads());
        target.setFreeHeapBytes(target.getFreeHeapBytes() + toMerge.getFreeHeapBytes());
        target.setFreeNonHeapBytes(target.getFreeNonHeapBytes() + toMerge.getFreeNonHeapBytes());
        target.setMaxHeapBytes(target.getMaxHeapBytes() + toMerge.getMaxHeapBytes());
        target.setMaxNonHeapBytes(target.getMaxNonHeapBytes() + toMerge.getMaxNonHeapBytes());
        target.setProcessorLoadAverage(target.getProcessorLoadAverage() + toMerge.getProcessorLoadAverage());
        target.setTotalHeapBytes(target.getTotalHeapBytes() + toMerge.getTotalHeapBytes());
        target.setTotalNonHeapBytes(target.getTotalNonHeapBytes() + toMerge.getTotalNonHeapBytes());
        target.setTotalThreads(target.getTotalThreads() + toMerge.getTotalThreads());
        target.setUsedHeapBytes(target.getUsedHeapBytes() + toMerge.getUsedHeapBytes());
        target.setUsedNonHeapBytes(target.getUsedNonHeapBytes() + toMerge.getUsedNonHeapBytes());

        merge(target.getContentRepositoryStorageUsage(), toMerge.getContentRepositoryStorageUsage());
        merge(target.getFlowFileRepositoryStorageUsage(), toMerge.getFlowFileRepositoryStorageUsage());
        mergeGarbageCollection(target.getGarbageCollection(), toMerge.getGarbageCollection());

        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final SystemDiagnosticsSnapshotDTO target) {
        // heap
        target.setMaxHeap(FormatUtils.formatDataSize(target.getMaxHeapBytes()));
        target.setTotalHeap(FormatUtils.formatDataSize(target.getTotalHeapBytes()));
        target.setUsedHeap(FormatUtils.formatDataSize(target.getUsedHeapBytes()));
        target.setFreeHeap(FormatUtils.formatDataSize(target.getFreeHeapBytes()));
        if (target.getMaxHeapBytes() != -1) {
            target.setHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedHeapBytes(), target.getMaxHeapBytes())));
        }

        // non heap
        target.setMaxNonHeap(FormatUtils.formatDataSize(target.getMaxNonHeapBytes()));
        target.setTotalNonHeap(FormatUtils.formatDataSize(target.getTotalNonHeapBytes()));
        target.setUsedNonHeap(FormatUtils.formatDataSize(target.getUsedNonHeapBytes()));
        target.setFreeNonHeap(FormatUtils.formatDataSize(target.getFreeNonHeapBytes()));
        if (target.getMaxNonHeapBytes() != -1) {
            target.setNonHeapUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedNonHeapBytes(), target.getMaxNonHeapBytes())));
        }
    }

    public static void merge(final Set<StorageUsageDTO> targetSet, final Set<StorageUsageDTO> toMerge) {
        final Map<String, StorageUsageDTO> storageById = new HashMap<>();
        for (final StorageUsageDTO targetUsage : targetSet) {
            storageById.put(targetUsage.getIdentifier(), targetUsage);
        }

        for (final StorageUsageDTO usageToMerge : toMerge) {
            final StorageUsageDTO targetUsage = storageById.get(usageToMerge.getIdentifier());
            if (targetUsage == null) {
                storageById.put(usageToMerge.getIdentifier(), usageToMerge);
            } else {
                merge(targetUsage, usageToMerge);
            }
        }

        targetSet.clear();
        targetSet.addAll(storageById.values());
    }

    public static void merge(final StorageUsageDTO target, final StorageUsageDTO toMerge) {
        target.setFreeSpaceBytes(target.getFreeSpaceBytes() + toMerge.getFreeSpaceBytes());
        target.setTotalSpaceBytes(target.getTotalSpaceBytes() + toMerge.getTotalSpaceBytes());
        target.setUsedSpaceBytes(target.getUsedSpaceBytes() + toMerge.getUsedSpaceBytes());
        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final StorageUsageDTO target) {
        target.setFreeSpace(FormatUtils.formatDataSize(target.getFreeSpaceBytes()));
        target.setTotalSpace(FormatUtils.formatDataSize(target.getTotalSpaceBytes()));
        target.setUsedSpace(FormatUtils.formatDataSize(target.getUsedSpaceBytes()));

        if (target.getTotalSpaceBytes() != -1) {
            target.setUtilization(FormatUtils.formatUtilization(getUtilization(target.getUsedSpaceBytes(), target.getTotalSpaceBytes())));
        }
    }


    public static void mergeGarbageCollection(final Set<GarbageCollectionDTO> targetSet, final Set<GarbageCollectionDTO> toMerge) {
        final Map<String, GarbageCollectionDTO> storageById = new HashMap<>();
        for (final GarbageCollectionDTO targetUsage : targetSet) {
            storageById.put(targetUsage.getName(), targetUsage);
        }

        for (final GarbageCollectionDTO usageToMerge : toMerge) {
            final GarbageCollectionDTO targetUsage = storageById.get(usageToMerge.getName());
            if (targetUsage == null) {
                storageById.put(usageToMerge.getName(), usageToMerge);
            } else {
                merge(targetUsage, usageToMerge);
            }
        }

        targetSet.clear();
        targetSet.addAll(storageById.values());
    }

    public static void merge(final GarbageCollectionDTO target, final GarbageCollectionDTO toMerge) {
        target.setCollectionCount(target.getCollectionCount() + toMerge.getCollectionCount());
        target.setCollectionMillis(target.getCollectionMillis() + toMerge.getCollectionMillis());
        updatePrettyPrintedFields(target);
    }

    public static void updatePrettyPrintedFields(final GarbageCollectionDTO target) {
        target.setCollectionTime(FormatUtils.formatHoursMinutesSeconds(target.getCollectionMillis(), TimeUnit.MILLISECONDS));
    }

    public static void merge(final CountersDTO target, final CountersDTO toMerge, final String nodeId, final String nodeAddress, final Integer nodeApiPort) {
        merge(target.getAggregateSnapshot(), toMerge.getAggregateSnapshot());

        List<NodeCountersSnapshotDTO> nodeSnapshots = target.getNodeSnapshots();
        if (nodeSnapshots == null) {
            nodeSnapshots = new ArrayList<>();
        }

        final NodeCountersSnapshotDTO nodeCountersSnapshot = new NodeCountersSnapshotDTO();
        nodeCountersSnapshot.setNodeId(nodeId);
        nodeCountersSnapshot.setAddress(nodeAddress);
        nodeCountersSnapshot.setApiPort(nodeApiPort);
        nodeCountersSnapshot.setSnapshot(toMerge.getAggregateSnapshot());

        nodeSnapshots.add(nodeCountersSnapshot);

        target.setNodeSnapshots(nodeSnapshots);
    }

    public static void merge(final CountersSnapshotDTO target, final CountersSnapshotDTO toMerge) {
        final Map<String, CounterDTO> counters = new HashMap<>();

        for (final CounterDTO counter : target.getCounters()) {
            counters.put(counter.getId(), counter);
        }

        for (final CounterDTO counter : toMerge.getCounters()) {
            final CounterDTO existing = counters.get(counter.getId());
            if (existing == null) {
                counters.put(counter.getId(), counter);
            } else {
                merge(existing, counter);
            }
        }

        target.setCounters(counters.values());
    }

    public static void merge(final CounterDTO target, final CounterDTO toMerge) {
        target.setValueCount(target.getValueCount() + toMerge.getValueCount());
        target.setValue(FormatUtils.formatCount(target.getValueCount()));
    }


    public static int getUtilization(final double used, final double total) {
        return (int) Math.round((used / total) * 100);
    }

    public static String formatCount(final Integer intStatus) {
        return intStatus == null ? "-" : FormatUtils.formatCount(intStatus);
    }

    public static String formatDataSize(final Long longStatus) {
        return longStatus == null ? "-" : FormatUtils.formatDataSize(longStatus);
    }

    public static String prettyPrint(final Integer count, final Long bytes) {
        return formatCount(count) + " (" + formatDataSize(bytes) + ")";
    }

}

Reply via email to