agura commented on code in PR #957: URL: https://github.com/apache/ignite-3/pull/957#discussion_r943269284
########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Distribution metric calculates counts of measurements that gets into each bounds interval. + * Note, that {@link #value()} will return array length of {@code bounds.length + 1}. + * Last element contains count of measurements bigger than most right value of bounds. + */ +public class DistributionMetric extends AbstractMetric implements CompositeMetric { + /** Count of measurement for each bound. */ + private final AtomicLongArray measurements; + + /** Bounds of measurements. */ + private final long[] bounds; + + /** List of scalar metrics. */ + private volatile List<Metric> scalarMetrics = null; + + /** + * The constructor. + * + * @param name Name. + * @param desc Description. + * @param bounds Bounds of the buckets. + */ + public DistributionMetric(String name, @Nullable String desc, @NotNull long[] bounds) { + super(name, desc); + + assert bounds != null && bounds.length > 0; + assert isSorted(bounds); + + this.bounds = bounds; + this.measurements = new AtomicLongArray(bounds.length + 1); + } + + /** + * Check whether given array is sorted. + * + * @param arr Array to check. + * @return {@code True} if array sorted, {@code false} otherwise. + */ + private static boolean isSorted(@NotNull long[] arr) { + if (arr.length < 2) { + return true; + } + + for (int i = 1; i < arr.length; i++) { + if (arr[i - 1] > arr[i]) { + return false; + } + } + + return true; + } + + /** + * Adds a value to the interval which the value belongs to. + * + * @param x Value. + */ + public void add(long x) { + assert x >= 0; + + //Expect arrays of few elements. + for (int i = 0; i < bounds.length; i++) { + if (x <= bounds[i]) { + measurements.incrementAndGet(i); + + return; + } + } + + measurements.incrementAndGet(bounds.length); + } + + /** {@inheritDoc} */ + public long[] value() { + long[] res = new long[measurements.length()]; + + for (int i = 0; i < measurements.length(); i++) { + res[i] = measurements.get(i); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getValueAsString() { + StringBuilder sb = new StringBuilder("["); + + List<Metric> scalarMetrics = asScalarMetrics(); + + for (int i = 0; i < scalarMetrics.size(); i++) { + LongMetric m = (LongMetric) scalarMetrics.get(i); + + sb.append(m.name()) + .append(':') + .append(m.value()); + + if (i < scalarMetrics.size() - 1) { + sb.append(", "); + } + } + + sb.append(']'); + + return sb.toString(); + } + + /** + * Bounds of the buckets of distribution. + * + * @return Bounds of the buckets of distribution. + */ + public long[] bounds() { + return bounds; + } + + /** {@inheritDoc} */ + @Override public List<Metric> asScalarMetrics() { Review Comment: Annotation must be on a previous line. It seems that method should not be public. Package private should be enough. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Distribution metric calculates counts of measurements that gets into each bounds interval. + * Note, that {@link #value()} will return array length of {@code bounds.length + 1}. + * Last element contains count of measurements bigger than most right value of bounds. + */ +public class DistributionMetric extends AbstractMetric implements CompositeMetric { + /** Count of measurement for each bound. */ + private final AtomicLongArray measurements; + + /** Bounds of measurements. */ + private final long[] bounds; + + /** List of scalar metrics. */ + private volatile List<Metric> scalarMetrics = null; + + /** + * The constructor. + * + * @param name Name. + * @param desc Description. + * @param bounds Bounds of the buckets. + */ + public DistributionMetric(String name, @Nullable String desc, @NotNull long[] bounds) { + super(name, desc); + + assert bounds != null && bounds.length > 0; + assert isSorted(bounds); + + this.bounds = bounds; + this.measurements = new AtomicLongArray(bounds.length + 1); + } + + /** + * Check whether given array is sorted. + * + * @param arr Array to check. + * @return {@code True} if array sorted, {@code false} otherwise. + */ + private static boolean isSorted(@NotNull long[] arr) { + if (arr.length < 2) { + return true; + } + + for (int i = 1; i < arr.length; i++) { + if (arr[i - 1] > arr[i]) { + return false; + } + } + + return true; + } + + /** + * Adds a value to the interval which the value belongs to. + * + * @param x Value. + */ + public void add(long x) { + assert x >= 0; + + //Expect arrays of few elements. + for (int i = 0; i < bounds.length; i++) { + if (x <= bounds[i]) { + measurements.incrementAndGet(i); + + return; + } + } + + measurements.incrementAndGet(bounds.length); + } + + /** {@inheritDoc} */ + public long[] value() { + long[] res = new long[measurements.length()]; + + for (int i = 0; i < measurements.length(); i++) { + res[i] = measurements.get(i); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getValueAsString() { + StringBuilder sb = new StringBuilder("["); + + List<Metric> scalarMetrics = asScalarMetrics(); + + for (int i = 0; i < scalarMetrics.size(); i++) { + LongMetric m = (LongMetric) scalarMetrics.get(i); + + sb.append(m.name()) + .append(':') + .append(m.value()); + + if (i < scalarMetrics.size() - 1) { + sb.append(", "); + } + } + + sb.append(']'); + + return sb.toString(); + } + + /** + * Bounds of the buckets of distribution. + * + * @return Bounds of the buckets of distribution. + */ + public long[] bounds() { + return bounds; + } + + /** {@inheritDoc} */ + @Override public List<Metric> asScalarMetrics() { + if (scalarMetrics == null) { + List<Metric> metrics = new ArrayList<>(); + + for (int i = 0; i < measurements.length(); i++) { + String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]); + String to = i == measurements.length() - 1 ? "" : String.valueOf(bounds[i]); + + String name = new StringBuilder(from).append('_').append(to).toString(); + + final int index = i; + LongGauge gauge = new LongGauge(name, "Single distribution bucket", () -> measurements.get(index)); + + metrics.add(gauge); + } + + AtomicReferenceFieldUpdater<DistributionMetric, List> updater = Review Comment: The main idea is to have exactly one instance of an atomic field updater instead of a lot of instances atomic classes. Current implementation doesn't make sense. See `AbstractMetricSource` for example of usage an atomic field updater. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Distribution metric calculates counts of measurements that gets into each bounds interval. + * Note, that {@link #value()} will return array length of {@code bounds.length + 1}. + * Last element contains count of measurements bigger than most right value of bounds. + */ +public class DistributionMetric extends AbstractMetric implements CompositeMetric { + /** Count of measurement for each bound. */ + private final AtomicLongArray measurements; + + /** Bounds of measurements. */ + private final long[] bounds; + + /** List of scalar metrics. */ + private volatile List<Metric> scalarMetrics = null; + + /** + * The constructor. + * + * @param name Name. + * @param desc Description. + * @param bounds Bounds of the buckets. + */ + public DistributionMetric(String name, @Nullable String desc, @NotNull long[] bounds) { + super(name, desc); + + assert bounds != null && bounds.length > 0; + assert isSorted(bounds); + + this.bounds = bounds; + this.measurements = new AtomicLongArray(bounds.length + 1); + } + + /** + * Check whether given array is sorted. + * + * @param arr Array to check. + * @return {@code True} if array sorted, {@code false} otherwise. + */ + private static boolean isSorted(@NotNull long[] arr) { + if (arr.length < 2) { + return true; + } + + for (int i = 1; i < arr.length; i++) { + if (arr[i - 1] > arr[i]) { + return false; + } + } + + return true; + } + + /** + * Adds a value to the interval which the value belongs to. + * + * @param x Value. + */ + public void add(long x) { + assert x >= 0; + + //Expect arrays of few elements. + for (int i = 0; i < bounds.length; i++) { + if (x <= bounds[i]) { + measurements.incrementAndGet(i); + + return; + } + } + + measurements.incrementAndGet(bounds.length); + } + + /** {@inheritDoc} */ + public long[] value() { + long[] res = new long[measurements.length()]; + + for (int i = 0; i < measurements.length(); i++) { + res[i] = measurements.get(i); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getValueAsString() { + StringBuilder sb = new StringBuilder("["); + + List<Metric> scalarMetrics = asScalarMetrics(); + + for (int i = 0; i < scalarMetrics.size(); i++) { + LongMetric m = (LongMetric) scalarMetrics.get(i); + + sb.append(m.name()) + .append(':') + .append(m.value()); + + if (i < scalarMetrics.size() - 1) { + sb.append(", "); + } + } + + sb.append(']'); + + return sb.toString(); + } + + /** + * Bounds of the buckets of distribution. + * + * @return Bounds of the buckets of distribution. + */ + public long[] bounds() { + return bounds; + } + + /** {@inheritDoc} */ + @Override public List<Metric> asScalarMetrics() { + if (scalarMetrics == null) { + List<Metric> metrics = new ArrayList<>(); + + for (int i = 0; i < measurements.length(); i++) { + String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]); + String to = i == measurements.length() - 1 ? "" : String.valueOf(bounds[i]); + + String name = new StringBuilder(from).append('_').append(to).toString(); Review Comment: Redundant `StringBuilder` instance. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricRegistry.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.ignite.internal.util.FilteringIterator; +import org.apache.ignite.internal.util.TransformingIterator; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** + * Metric registry. Metrics source (see {@link MetricSource} must be registered in this metrics registry after initialization + * of corresponding component and must be unregistered in case of component is destroyed or stopped. Metrics registry also + * provides access to all enabled metrics through corresponding metrics sets. Metrics registry lifetime is equal to the node lifetime. + * <br> + * Implements an {@link Iterable} over the metric sets for enabled metric sources. + */ +public class MetricRegistry implements Iterable<MetricSet> { + private final Lock lock = new ReentrantLock(); + + /** Map of metric sources' names to tuples of registered sources with metric sets, if enabled. */ + private volatile Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources = new TreeMap<>(); + + /** Version always should be changed on metrics enabled/disabled action. */ + private volatile long version; + + /** + * Register metric source. It must be registered in this metrics registry after initialization of corresponding component + * and must be unregistered in case of component is destroyed or stopped, see {@link #unregisterSource(MetricSource)}. + * By registering, the metric source isn't enabled implicitly. + * + * @param src Metric source. + * @throws IllegalStateException If metric source with the given name already exists. + */ + public void registerSource(MetricSource src) { + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> s = new IgniteBiTuple<>(src, null); + + IgniteBiTuple<MetricSource, MetricSet> old = sources.putIfAbsent(src.name(), s); + + if (old != null) { + throw new IllegalStateException("Metrics source with given name already exists: " + src.name()); + } + + // Now we sure that this metric source wasn't registered before. + assert !src.enabled() : "Metric source shouldn't be enabled before registration in registry."; Review Comment: Why should not metric source enabled before registering? It is too strict limitation because the `MetricSource`'s contract doesn't prohibit this. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Distribution metric calculates counts of measurements that gets into each bounds interval. + * Note, that {@link #value()} will return array length of {@code bounds.length + 1}. + * Last element contains count of measurements bigger than most right value of bounds. + */ +public class DistributionMetric extends AbstractMetric implements CompositeMetric { + /** Count of measurement for each bound. */ + private final AtomicLongArray measurements; + + /** Bounds of measurements. */ + private final long[] bounds; + + /** List of scalar metrics. */ + private volatile List<Metric> scalarMetrics = null; + + /** + * The constructor. + * + * @param name Name. + * @param desc Description. + * @param bounds Bounds of the buckets. + */ + public DistributionMetric(String name, @Nullable String desc, @NotNull long[] bounds) { + super(name, desc); + + assert bounds != null && bounds.length > 0; + assert isSorted(bounds); + + this.bounds = bounds; + this.measurements = new AtomicLongArray(bounds.length + 1); + } + + /** + * Check whether given array is sorted. + * + * @param arr Array to check. + * @return {@code True} if array sorted, {@code false} otherwise. + */ + private static boolean isSorted(@NotNull long[] arr) { + if (arr.length < 2) { + return true; + } + + for (int i = 1; i < arr.length; i++) { + if (arr[i - 1] > arr[i]) { + return false; + } + } + + return true; + } + + /** + * Adds a value to the interval which the value belongs to. + * + * @param x Value. + */ + public void add(long x) { + assert x >= 0; + + //Expect arrays of few elements. + for (int i = 0; i < bounds.length; i++) { + if (x <= bounds[i]) { + measurements.incrementAndGet(i); + + return; + } + } + + measurements.incrementAndGet(bounds.length); + } + + /** {@inheritDoc} */ + public long[] value() { + long[] res = new long[measurements.length()]; + + for (int i = 0; i < measurements.length(); i++) { + res[i] = measurements.get(i); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getValueAsString() { + StringBuilder sb = new StringBuilder("["); + + List<Metric> scalarMetrics = asScalarMetrics(); + + for (int i = 0; i < scalarMetrics.size(); i++) { + LongMetric m = (LongMetric) scalarMetrics.get(i); + + sb.append(m.name()) + .append(':') + .append(m.value()); + + if (i < scalarMetrics.size() - 1) { + sb.append(", "); + } + } + + sb.append(']'); + + return sb.toString(); + } + + /** + * Bounds of the buckets of distribution. + * + * @return Bounds of the buckets of distribution. + */ + public long[] bounds() { + return bounds; + } + + /** {@inheritDoc} */ + @Override public List<Metric> asScalarMetrics() { + if (scalarMetrics == null) { + List<Metric> metrics = new ArrayList<>(); + + for (int i = 0; i < measurements.length(); i++) { + String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]); Review Comment: It seems that original implementation doesn't have 0-bucket. Please, recheck `MetricsUtils.histogramBucketNames` method in AI2. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricRegistry.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.ignite.internal.util.FilteringIterator; +import org.apache.ignite.internal.util.TransformingIterator; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** + * Metric registry. Metrics source (see {@link MetricSource} must be registered in this metrics registry after initialization + * of corresponding component and must be unregistered in case of component is destroyed or stopped. Metrics registry also + * provides access to all enabled metrics through corresponding metrics sets. Metrics registry lifetime is equal to the node lifetime. + * <br> + * Implements an {@link Iterable} over the metric sets for enabled metric sources. + */ +public class MetricRegistry implements Iterable<MetricSet> { + private final Lock lock = new ReentrantLock(); + + /** Map of metric sources' names to tuples of registered sources with metric sets, if enabled. */ + private volatile Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources = new TreeMap<>(); + + /** Version always should be changed on metrics enabled/disabled action. */ + private volatile long version; + + /** + * Register metric source. It must be registered in this metrics registry after initialization of corresponding component + * and must be unregistered in case of component is destroyed or stopped, see {@link #unregisterSource(MetricSource)}. + * By registering, the metric source isn't enabled implicitly. + * + * @param src Metric source. + * @throws IllegalStateException If metric source with the given name already exists. + */ + public void registerSource(MetricSource src) { + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> s = new IgniteBiTuple<>(src, null); + + IgniteBiTuple<MetricSource, MetricSet> old = sources.putIfAbsent(src.name(), s); + + if (old != null) { + throw new IllegalStateException("Metrics source with given name already exists: " + src.name()); + } + + // Now we sure that this metric source wasn't registered before. + assert !src.enabled() : "Metric source shouldn't be enabled before registration in registry."; + + return true; + }); + } + + /** + * Unregister metric source. It must be unregistered in case of corresponding component is destroyed or stopped. + * Metric source is also disabled while unregistered, see {@link #disable(String)}. + * + * @param src Metric source. + */ + public void unregisterSource(MetricSource src) { + unregisterSource(src.name()); + } + + /** + * Unregister metric source. It must be unregistered in case of corresponding component is destroyed or stopped. + * Metric source is also disabled while unregistered, see {@link #disable(String)}. + * + * @param srcName Metric source name. + */ + public void unregisterSource(String srcName) { + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> s = sources.get(srcName); + + if (s == null) { + return false; + } + + assert s.get1() != null; + + s.get1().disable(); + + sources.remove(srcName); + + return true; + }); + } + + /** + * Enable metric set for the given metric source. + * + * @param src Metric source. + * @return Metric set, or {@code null} if the metric set is already enabled. + * @throws IllegalStateException If metric source isn't registered. + * @throws IllegalArgumentException If metric source isn't the same as registered. + */ + public MetricSet enable(@NotNull MetricSource src) { + AtomicReference<MetricSet> metricSetRef = new AtomicReference<>(); + + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> registered = checkRegistered(sources, src); + + if (registered.get2() != null) { + assert src.enabled(); + return false; + } + + MetricSet metricSet = src.enable(); + + assert metricSet != null; + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, metricSet); + + sources.put(src.name(), updated); + + metricSetRef.set(metricSet); + + return true; + }); + + return metricSetRef.get(); + } + + /** + * Enable metric set for the given metric source. + * + * @param srcName Metric source name. + * @return Metric set, or {@code null} if the metric set is already enabled. + * @throws IllegalStateException If metric source with the given name doesn't exist. + */ + public MetricSet enable(final String srcName) { + AtomicReference<MetricSet> metricSetRef = new AtomicReference<>(); + + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> registered = sources.get(srcName); + + if (registered == null) { + throw new IllegalStateException("Metrics source with given name doesn't exist: " + srcName); + } + + MetricSource src = registered.get1(); + + if (registered.get2() != null) { + assert src.enabled(); + return false; + } + + MetricSet metricSet = src.enable(); + assert metricSet != null; + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, metricSet); + + sources.put(src.name(), updated); + + metricSetRef.set(metricSet); + + return true; + }); + + return metricSetRef.get(); + } + + /** + * Disable metric set for the given metric source. + * + * @param src Metric source. + * @throws IllegalStateException If metric source isn't registered. + * @throws IllegalArgumentException If metric source isn't the same as registered. + */ + public void disable(@NotNull MetricSource src) { + modifySources(sources -> { + checkRegistered(sources, src); + + src.disable(); + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, null); + + sources.put(src.name(), updated); + + return true; + }); + } + + /** + * Disable metric set for the given metric source. + * + * @param srcName Metric source name. + * @throws IllegalStateException If metric source with given name doesn't exists. + */ + public void disable(final String srcName) { + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> registered = sources.get(srcName); + + if (registered == null) { + throw new IllegalStateException("Metrics source with given name doesn't exists: " + srcName); + } + + MetricSource src = registered.get1(); + + src.disable(); + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, null); + + sources.put(src.name(), updated); + + return true; + }); + } + + /** + * Check that the given metric source is registered. + * + * @param sources Sources map. + * @param src Metric source. + * @return Registered pair of metric source and metric set. + * @throws IllegalStateException If metric source isn't registered. + * @throws IllegalArgumentException If metric source isn't the same as registered. + */ + @NotNull + private IgniteBiTuple<MetricSource, MetricSet> checkRegistered( + Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources, + @NotNull MetricSource src + ) { + requireNonNull(src); + + IgniteBiTuple<MetricSource, MetricSet> registered = sources.get(src.name()); + + if (registered == null) { + throw new IllegalStateException("Metrics source isn't registered: " + src.name()); + } + + if (!src.equals(registered.get1())) { + throw new IllegalArgumentException("Given metric source is not the same as registered by the same name: " + src.name()); + } + + return registered; + } + + /** + * Updates {@link MetricRegistry#sources} map using copy-on-write principle. Increments version of registry. + * + * @param modifier Modifier for the sources map. Accepts the new, modifiable version of the map. Returns boolean value, whether + * the map was modified. + */ + private void modifySources(Function<Map<String, IgniteBiTuple<MetricSource, MetricSet>>, Boolean> modifier) { + lock.lock(); + + try { + Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources0 = new TreeMap<>(sources); + + boolean modified = modifier.apply(sources0); + + if (modified) { + sources = sources0; + + version++; Review Comment: It is incorrect behavior because, for example, registering of metric source doesn't change the metrics schema until will not be enabled. The previous implementation was better because metrics are versioned while sources are not. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricRegistry.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import org.apache.ignite.internal.util.FilteringIterator; +import org.apache.ignite.internal.util.TransformingIterator; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; + +/** + * Metric registry. Metrics source (see {@link MetricSource} must be registered in this metrics registry after initialization + * of corresponding component and must be unregistered in case of component is destroyed or stopped. Metrics registry also + * provides access to all enabled metrics through corresponding metrics sets. Metrics registry lifetime is equal to the node lifetime. + * <br> + * Implements an {@link Iterable} over the metric sets for enabled metric sources. + */ +public class MetricRegistry implements Iterable<MetricSet> { + private final Lock lock = new ReentrantLock(); + + /** Map of metric sources' names to tuples of registered sources with metric sets, if enabled. */ + private volatile Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources = new TreeMap<>(); + + /** Version always should be changed on metrics enabled/disabled action. */ + private volatile long version; + + /** + * Register metric source. It must be registered in this metrics registry after initialization of corresponding component + * and must be unregistered in case of component is destroyed or stopped, see {@link #unregisterSource(MetricSource)}. + * By registering, the metric source isn't enabled implicitly. + * + * @param src Metric source. + * @throws IllegalStateException If metric source with the given name already exists. + */ + public void registerSource(MetricSource src) { + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> s = new IgniteBiTuple<>(src, null); + + IgniteBiTuple<MetricSource, MetricSet> old = sources.putIfAbsent(src.name(), s); + + if (old != null) { + throw new IllegalStateException("Metrics source with given name already exists: " + src.name()); + } + + // Now we sure that this metric source wasn't registered before. + assert !src.enabled() : "Metric source shouldn't be enabled before registration in registry."; + + return true; + }); + } + + /** + * Unregister metric source. It must be unregistered in case of corresponding component is destroyed or stopped. + * Metric source is also disabled while unregistered, see {@link #disable(String)}. + * + * @param src Metric source. + */ + public void unregisterSource(MetricSource src) { + unregisterSource(src.name()); + } + + /** + * Unregister metric source. It must be unregistered in case of corresponding component is destroyed or stopped. + * Metric source is also disabled while unregistered, see {@link #disable(String)}. + * + * @param srcName Metric source name. + */ + public void unregisterSource(String srcName) { + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> s = sources.get(srcName); + + if (s == null) { + return false; + } + + assert s.get1() != null; + + s.get1().disable(); + + sources.remove(srcName); + + return true; + }); + } + + /** + * Enable metric set for the given metric source. + * + * @param src Metric source. + * @return Metric set, or {@code null} if the metric set is already enabled. + * @throws IllegalStateException If metric source isn't registered. + * @throws IllegalArgumentException If metric source isn't the same as registered. + */ + public MetricSet enable(@NotNull MetricSource src) { + AtomicReference<MetricSet> metricSetRef = new AtomicReference<>(); + + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> registered = checkRegistered(sources, src); + + if (registered.get2() != null) { + assert src.enabled(); + return false; + } + + MetricSet metricSet = src.enable(); + + assert metricSet != null; + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, metricSet); + + sources.put(src.name(), updated); + + metricSetRef.set(metricSet); + + return true; + }); + + return metricSetRef.get(); + } + + /** + * Enable metric set for the given metric source. + * + * @param srcName Metric source name. + * @return Metric set, or {@code null} if the metric set is already enabled. + * @throws IllegalStateException If metric source with the given name doesn't exist. + */ + public MetricSet enable(final String srcName) { + AtomicReference<MetricSet> metricSetRef = new AtomicReference<>(); + + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> registered = sources.get(srcName); + + if (registered == null) { + throw new IllegalStateException("Metrics source with given name doesn't exist: " + srcName); + } + + MetricSource src = registered.get1(); + + if (registered.get2() != null) { + assert src.enabled(); + return false; + } + + MetricSet metricSet = src.enable(); + assert metricSet != null; + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, metricSet); + + sources.put(src.name(), updated); + + metricSetRef.set(metricSet); + + return true; + }); + + return metricSetRef.get(); + } + + /** + * Disable metric set for the given metric source. + * + * @param src Metric source. + * @throws IllegalStateException If metric source isn't registered. + * @throws IllegalArgumentException If metric source isn't the same as registered. + */ + public void disable(@NotNull MetricSource src) { + modifySources(sources -> { + checkRegistered(sources, src); + + src.disable(); + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, null); + + sources.put(src.name(), updated); + + return true; + }); + } + + /** + * Disable metric set for the given metric source. + * + * @param srcName Metric source name. + * @throws IllegalStateException If metric source with given name doesn't exists. + */ + public void disable(final String srcName) { + modifySources(sources -> { + IgniteBiTuple<MetricSource, MetricSet> registered = sources.get(srcName); + + if (registered == null) { + throw new IllegalStateException("Metrics source with given name doesn't exists: " + srcName); + } + + MetricSource src = registered.get1(); + + src.disable(); + + IgniteBiTuple<MetricSource, MetricSet> updated = new IgniteBiTuple<>(src, null); + + sources.put(src.name(), updated); + + return true; + }); + } + + /** + * Check that the given metric source is registered. + * + * @param sources Sources map. + * @param src Metric source. + * @return Registered pair of metric source and metric set. + * @throws IllegalStateException If metric source isn't registered. + * @throws IllegalArgumentException If metric source isn't the same as registered. + */ + @NotNull + private IgniteBiTuple<MetricSource, MetricSet> checkRegistered( + Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources, + @NotNull MetricSource src + ) { + requireNonNull(src); + + IgniteBiTuple<MetricSource, MetricSet> registered = sources.get(src.name()); + + if (registered == null) { + throw new IllegalStateException("Metrics source isn't registered: " + src.name()); + } + + if (!src.equals(registered.get1())) { + throw new IllegalArgumentException("Given metric source is not the same as registered by the same name: " + src.name()); + } + + return registered; + } + + /** + * Updates {@link MetricRegistry#sources} map using copy-on-write principle. Increments version of registry. + * + * @param modifier Modifier for the sources map. Accepts the new, modifiable version of the map. Returns boolean value, whether + * the map was modified. + */ + private void modifySources(Function<Map<String, IgniteBiTuple<MetricSource, MetricSet>>, Boolean> modifier) { + lock.lock(); + + try { + Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources0 = new TreeMap<>(sources); + + boolean modified = modifier.apply(sources0); + + if (modified) { + sources = sources0; + + version++; + } + } finally { + lock.unlock(); + } + } + + /** + * Returns registry schema version. + * + * @return Version. + */ + public long version() { + return version; + } + + /** {@inheritDoc} */ + @Override + public Iterator<MetricSet> iterator() { Review Comment: An iterator doesn't provide repeatable behavior. You can't reuse iterator. A new iterator could be returned for new version. So we need to provide a method that returns version and map with metrics atomically. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Distribution metric calculates counts of measurements that gets into each bounds interval. + * Note, that {@link #value()} will return array length of {@code bounds.length + 1}. + * Last element contains count of measurements bigger than most right value of bounds. + */ +public class DistributionMetric extends AbstractMetric implements CompositeMetric { + /** Count of measurement for each bound. */ + private final AtomicLongArray measurements; + + /** Bounds of measurements. */ + private final long[] bounds; + + /** List of scalar metrics. */ + private volatile List<Metric> scalarMetrics = null; + + /** + * The constructor. + * + * @param name Name. + * @param desc Description. + * @param bounds Bounds of the buckets. + */ + public DistributionMetric(String name, @Nullable String desc, @NotNull long[] bounds) { + super(name, desc); + + assert bounds != null && bounds.length > 0; + assert isSorted(bounds); + + this.bounds = bounds; + this.measurements = new AtomicLongArray(bounds.length + 1); + } + + /** + * Check whether given array is sorted. + * + * @param arr Array to check. + * @return {@code True} if array sorted, {@code false} otherwise. + */ + private static boolean isSorted(@NotNull long[] arr) { + if (arr.length < 2) { + return true; + } + + for (int i = 1; i < arr.length; i++) { + if (arr[i - 1] > arr[i]) { + return false; + } + } + + return true; + } + + /** + * Adds a value to the interval which the value belongs to. + * + * @param x Value. + */ + public void add(long x) { + assert x >= 0; + + //Expect arrays of few elements. + for (int i = 0; i < bounds.length; i++) { + if (x <= bounds[i]) { + measurements.incrementAndGet(i); + + return; + } + } + + measurements.incrementAndGet(bounds.length); + } + + /** {@inheritDoc} */ + public long[] value() { + long[] res = new long[measurements.length()]; + + for (int i = 0; i < measurements.length(); i++) { + res[i] = measurements.get(i); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getValueAsString() { + StringBuilder sb = new StringBuilder("["); + + List<Metric> scalarMetrics = asScalarMetrics(); + + for (int i = 0; i < scalarMetrics.size(); i++) { + LongMetric m = (LongMetric) scalarMetrics.get(i); + + sb.append(m.name()) + .append(':') + .append(m.value()); + + if (i < scalarMetrics.size() - 1) { + sb.append(", "); + } + } + + sb.append(']'); + + return sb.toString(); + } + + /** + * Bounds of the buckets of distribution. + * + * @return Bounds of the buckets of distribution. + */ + public long[] bounds() { + return bounds; + } + + /** {@inheritDoc} */ + @Override public List<Metric> asScalarMetrics() { + if (scalarMetrics == null) { + List<Metric> metrics = new ArrayList<>(); + + for (int i = 0; i < measurements.length(); i++) { + String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]); + String to = i == measurements.length() - 1 ? "" : String.valueOf(bounds[i]); + + String name = new StringBuilder(from).append('_').append(to).toString(); Review Comment: See AI 2 `HistorgamMetric`. The name of the last range should end with `INF` or `Inf` or `Infinity`. ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Distribution metric calculates counts of measurements that gets into each bounds interval. + * Note, that {@link #value()} will return array length of {@code bounds.length + 1}. + * Last element contains count of measurements bigger than most right value of bounds. + */ +public class DistributionMetric extends AbstractMetric implements CompositeMetric { + /** Count of measurement for each bound. */ + private final AtomicLongArray measurements; + + /** Bounds of measurements. */ + private final long[] bounds; + + /** List of scalar metrics. */ + private volatile List<Metric> scalarMetrics = null; + + /** + * The constructor. + * + * @param name Name. + * @param desc Description. + * @param bounds Bounds of the buckets. + */ + public DistributionMetric(String name, @Nullable String desc, @NotNull long[] bounds) { + super(name, desc); + + assert bounds != null && bounds.length > 0; + assert isSorted(bounds); + + this.bounds = bounds; + this.measurements = new AtomicLongArray(bounds.length + 1); + } + + /** + * Check whether given array is sorted. + * + * @param arr Array to check. + * @return {@code True} if array sorted, {@code false} otherwise. + */ + private static boolean isSorted(@NotNull long[] arr) { + if (arr.length < 2) { + return true; + } + + for (int i = 1; i < arr.length; i++) { + if (arr[i - 1] > arr[i]) { + return false; + } + } + + return true; + } + + /** + * Adds a value to the interval which the value belongs to. + * + * @param x Value. + */ + public void add(long x) { + assert x >= 0; + + //Expect arrays of few elements. + for (int i = 0; i < bounds.length; i++) { + if (x <= bounds[i]) { + measurements.incrementAndGet(i); + + return; + } + } + + measurements.incrementAndGet(bounds.length); + } + + /** {@inheritDoc} */ + public long[] value() { + long[] res = new long[measurements.length()]; + + for (int i = 0; i < measurements.length(); i++) { + res[i] = measurements.get(i); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getValueAsString() { Review Comment: Why is it annotated as `@Nullable`? ########## modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metrics; + +import static java.util.Collections.unmodifiableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Distribution metric calculates counts of measurements that gets into each bounds interval. + * Note, that {@link #value()} will return array length of {@code bounds.length + 1}. + * Last element contains count of measurements bigger than most right value of bounds. + */ +public class DistributionMetric extends AbstractMetric implements CompositeMetric { + /** Count of measurement for each bound. */ + private final AtomicLongArray measurements; + + /** Bounds of measurements. */ + private final long[] bounds; + + /** List of scalar metrics. */ + private volatile List<Metric> scalarMetrics = null; + + /** + * The constructor. + * + * @param name Name. + * @param desc Description. + * @param bounds Bounds of the buckets. + */ + public DistributionMetric(String name, @Nullable String desc, @NotNull long[] bounds) { + super(name, desc); + + assert bounds != null && bounds.length > 0; + assert isSorted(bounds); + + this.bounds = bounds; + this.measurements = new AtomicLongArray(bounds.length + 1); + } + + /** + * Check whether given array is sorted. + * + * @param arr Array to check. + * @return {@code True} if array sorted, {@code false} otherwise. + */ + private static boolean isSorted(@NotNull long[] arr) { + if (arr.length < 2) { + return true; + } + + for (int i = 1; i < arr.length; i++) { + if (arr[i - 1] > arr[i]) { + return false; + } + } + + return true; + } + + /** + * Adds a value to the interval which the value belongs to. + * + * @param x Value. + */ + public void add(long x) { + assert x >= 0; + + //Expect arrays of few elements. + for (int i = 0; i < bounds.length; i++) { + if (x <= bounds[i]) { + measurements.incrementAndGet(i); + + return; + } + } + + measurements.incrementAndGet(bounds.length); + } + + /** {@inheritDoc} */ + public long[] value() { + long[] res = new long[measurements.length()]; + + for (int i = 0; i < measurements.length(); i++) { + res[i] = measurements.get(i); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public @Nullable String getValueAsString() { Review Comment: Method annotation must be on the previous line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
