[1/2] nifi git commit: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask
Repository: nifi Updated Branches: refs/heads/master ce0855e98 -> 6fbe1515e http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java -- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java new file mode 100644 index 000..20416e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.reporting.util.metrics.MetricNames; +import org.apache.nifi.reporting.util.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; + +import com.yammer.metrics.core.VirtualMachineMetrics; + +@Tags({"status", "metrics", "site", "site to site"}) +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.") +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask { + +static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted" ++ " according to the Ambari Metrics API. See Additional Details in Usage documentation."); +static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted" ++ " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to" ++ " have the description of the default schema."); + +static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder() +.name("s2s-metrics-application-id") +.displayName("Application ID") +.description("The Application ID to be included in the metrics") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("nifi") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() +.name("s2s-metrics-hostname") +.displayName("Hostname") +.description("The Hostna
[1/2] nifi git commit: NIFI-4809 - Implement a SiteToSiteMetricsReportingTask
Repository: nifi Updated Branches: refs/heads/master 844da0634 -> 930417b9d http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java -- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index 526b5d5..d623b6f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -60,8 +60,6 @@ import org.apache.nifi.remote.TransferDirection; + "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.") public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask { -static final String TIMESTAMP_FORMAT = "-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") .description("The value to use for the platform field in each status record.") @@ -70,6 +68,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa .defaultValue("nifi") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder() .name("Component Type Filter Regex") .description("A regex specifying which component types to report. Any component type matching this regex will be included. " @@ -79,6 +78,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa .defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)") .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) .build(); + static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder() .name("Component Name Filter Regex") .description("A regex specifying which component names to report. Any component name matching this regex will be included.") @@ -197,7 +197,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa *The component name * @return Whether the component matches both filters */ -boolean componentMatchesFilters(final String componentType, final String componentName) { +private boolean componentMatchesFilters(final String componentType, final String componentName) { return componentTypeFilter.matcher(componentType).matches() && componentNameFilter.matcher(componentName).matches(); } @@ -221,7 +221,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa * @param parentId *The parent's component id */ -void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, +private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); @@ -278,7 +278,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } } -void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, +private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); @@ -303,7 +303,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } } -void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, +private void serializePortStatus(final Str