agura commented on code in PR #957:
URL: https://github.com/apache/ignite-3/pull/957#discussion_r943269284


##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Distribution metric calculates counts of measurements that gets into each 
bounds interval.
+ * Note, that {@link #value()} will return array length of {@code 
bounds.length + 1}.
+ * Last element contains count of measurements bigger than most right value of 
bounds.
+ */
+public class DistributionMetric extends AbstractMetric implements 
CompositeMetric {
+    /** Count of measurement for each bound. */
+    private final AtomicLongArray measurements;
+
+    /** Bounds of measurements. */
+    private final long[] bounds;
+
+    /** List of scalar metrics. */
+    private volatile List<Metric> scalarMetrics = null;
+
+    /**
+     * The constructor.
+     *
+     * @param name Name.
+     * @param desc Description.
+     * @param bounds Bounds of the buckets.
+     */
+    public DistributionMetric(String name, @Nullable String desc, @NotNull 
long[] bounds) {
+        super(name, desc);
+
+        assert bounds != null && bounds.length > 0;
+        assert isSorted(bounds);
+
+        this.bounds = bounds;
+        this.measurements = new AtomicLongArray(bounds.length + 1);
+    }
+
+    /**
+     * Check whether given array is sorted.
+     *
+     * @param arr Array to check.
+     * @return {@code True} if array sorted, {@code false} otherwise.
+     */
+    private static boolean isSorted(@NotNull long[] arr) {
+        if (arr.length < 2) {
+            return true;
+        }
+
+        for (int i = 1; i < arr.length; i++) {
+            if (arr[i - 1] > arr[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Adds a value to the interval which the value belongs to.
+     *
+     * @param x Value.
+     */
+    public void add(long x) {
+        assert x >= 0;
+
+        //Expect arrays of few elements.
+        for (int i = 0; i < bounds.length; i++) {
+            if (x <= bounds[i]) {
+                measurements.incrementAndGet(i);
+
+                return;
+            }
+        }
+
+        measurements.incrementAndGet(bounds.length);
+    }
+
+    /** {@inheritDoc} */
+    public long[] value() {
+        long[] res = new long[measurements.length()];
+
+        for (int i = 0; i < measurements.length(); i++) {
+            res[i] = measurements.get(i);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable String getValueAsString() {
+        StringBuilder sb = new StringBuilder("[");
+
+        List<Metric> scalarMetrics = asScalarMetrics();
+
+        for (int i = 0; i < scalarMetrics.size(); i++) {
+            LongMetric m = (LongMetric) scalarMetrics.get(i);
+
+            sb.append(m.name())
+                    .append(':')
+                    .append(m.value());
+
+            if (i < scalarMetrics.size() - 1) {
+                sb.append(", ");
+            }
+        }
+
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    /**
+     * Bounds of the buckets of distribution.
+     *
+     * @return Bounds of the buckets of distribution.
+     */
+    public long[] bounds() {
+        return bounds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Metric> asScalarMetrics() {

Review Comment:
   Annotation must be on a previous line. It seems that method should not be 
public. Package private should be enough.



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Distribution metric calculates counts of measurements that gets into each 
bounds interval.
+ * Note, that {@link #value()} will return array length of {@code 
bounds.length + 1}.
+ * Last element contains count of measurements bigger than most right value of 
bounds.
+ */
+public class DistributionMetric extends AbstractMetric implements 
CompositeMetric {
+    /** Count of measurement for each bound. */
+    private final AtomicLongArray measurements;
+
+    /** Bounds of measurements. */
+    private final long[] bounds;
+
+    /** List of scalar metrics. */
+    private volatile List<Metric> scalarMetrics = null;
+
+    /**
+     * The constructor.
+     *
+     * @param name Name.
+     * @param desc Description.
+     * @param bounds Bounds of the buckets.
+     */
+    public DistributionMetric(String name, @Nullable String desc, @NotNull 
long[] bounds) {
+        super(name, desc);
+
+        assert bounds != null && bounds.length > 0;
+        assert isSorted(bounds);
+
+        this.bounds = bounds;
+        this.measurements = new AtomicLongArray(bounds.length + 1);
+    }
+
+    /**
+     * Check whether given array is sorted.
+     *
+     * @param arr Array to check.
+     * @return {@code True} if array sorted, {@code false} otherwise.
+     */
+    private static boolean isSorted(@NotNull long[] arr) {
+        if (arr.length < 2) {
+            return true;
+        }
+
+        for (int i = 1; i < arr.length; i++) {
+            if (arr[i - 1] > arr[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Adds a value to the interval which the value belongs to.
+     *
+     * @param x Value.
+     */
+    public void add(long x) {
+        assert x >= 0;
+
+        //Expect arrays of few elements.
+        for (int i = 0; i < bounds.length; i++) {
+            if (x <= bounds[i]) {
+                measurements.incrementAndGet(i);
+
+                return;
+            }
+        }
+
+        measurements.incrementAndGet(bounds.length);
+    }
+
+    /** {@inheritDoc} */
+    public long[] value() {
+        long[] res = new long[measurements.length()];
+
+        for (int i = 0; i < measurements.length(); i++) {
+            res[i] = measurements.get(i);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable String getValueAsString() {
+        StringBuilder sb = new StringBuilder("[");
+
+        List<Metric> scalarMetrics = asScalarMetrics();
+
+        for (int i = 0; i < scalarMetrics.size(); i++) {
+            LongMetric m = (LongMetric) scalarMetrics.get(i);
+
+            sb.append(m.name())
+                    .append(':')
+                    .append(m.value());
+
+            if (i < scalarMetrics.size() - 1) {
+                sb.append(", ");
+            }
+        }
+
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    /**
+     * Bounds of the buckets of distribution.
+     *
+     * @return Bounds of the buckets of distribution.
+     */
+    public long[] bounds() {
+        return bounds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Metric> asScalarMetrics() {
+        if (scalarMetrics == null) {
+            List<Metric> metrics = new ArrayList<>();
+
+            for (int i = 0; i < measurements.length(); i++) {
+                String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]);
+                String to = i == measurements.length() - 1 ? "" : 
String.valueOf(bounds[i]);
+
+                String name = new 
StringBuilder(from).append('_').append(to).toString();
+
+                final int index = i;
+                LongGauge gauge = new LongGauge(name, "Single distribution 
bucket", () -> measurements.get(index));
+
+                metrics.add(gauge);
+            }
+
+            AtomicReferenceFieldUpdater<DistributionMetric, List> updater =

Review Comment:
   The main idea is to have exactly one instance of an atomic field updater 
instead of a lot of instances atomic classes. Current implementation doesn't 
make sense. See `AbstractMetricSource` for example of usage an atomic field 
updater.



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Distribution metric calculates counts of measurements that gets into each 
bounds interval.
+ * Note, that {@link #value()} will return array length of {@code 
bounds.length + 1}.
+ * Last element contains count of measurements bigger than most right value of 
bounds.
+ */
+public class DistributionMetric extends AbstractMetric implements 
CompositeMetric {
+    /** Count of measurement for each bound. */
+    private final AtomicLongArray measurements;
+
+    /** Bounds of measurements. */
+    private final long[] bounds;
+
+    /** List of scalar metrics. */
+    private volatile List<Metric> scalarMetrics = null;
+
+    /**
+     * The constructor.
+     *
+     * @param name Name.
+     * @param desc Description.
+     * @param bounds Bounds of the buckets.
+     */
+    public DistributionMetric(String name, @Nullable String desc, @NotNull 
long[] bounds) {
+        super(name, desc);
+
+        assert bounds != null && bounds.length > 0;
+        assert isSorted(bounds);
+
+        this.bounds = bounds;
+        this.measurements = new AtomicLongArray(bounds.length + 1);
+    }
+
+    /**
+     * Check whether given array is sorted.
+     *
+     * @param arr Array to check.
+     * @return {@code True} if array sorted, {@code false} otherwise.
+     */
+    private static boolean isSorted(@NotNull long[] arr) {
+        if (arr.length < 2) {
+            return true;
+        }
+
+        for (int i = 1; i < arr.length; i++) {
+            if (arr[i - 1] > arr[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Adds a value to the interval which the value belongs to.
+     *
+     * @param x Value.
+     */
+    public void add(long x) {
+        assert x >= 0;
+
+        //Expect arrays of few elements.
+        for (int i = 0; i < bounds.length; i++) {
+            if (x <= bounds[i]) {
+                measurements.incrementAndGet(i);
+
+                return;
+            }
+        }
+
+        measurements.incrementAndGet(bounds.length);
+    }
+
+    /** {@inheritDoc} */
+    public long[] value() {
+        long[] res = new long[measurements.length()];
+
+        for (int i = 0; i < measurements.length(); i++) {
+            res[i] = measurements.get(i);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable String getValueAsString() {
+        StringBuilder sb = new StringBuilder("[");
+
+        List<Metric> scalarMetrics = asScalarMetrics();
+
+        for (int i = 0; i < scalarMetrics.size(); i++) {
+            LongMetric m = (LongMetric) scalarMetrics.get(i);
+
+            sb.append(m.name())
+                    .append(':')
+                    .append(m.value());
+
+            if (i < scalarMetrics.size() - 1) {
+                sb.append(", ");
+            }
+        }
+
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    /**
+     * Bounds of the buckets of distribution.
+     *
+     * @return Bounds of the buckets of distribution.
+     */
+    public long[] bounds() {
+        return bounds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Metric> asScalarMetrics() {
+        if (scalarMetrics == null) {
+            List<Metric> metrics = new ArrayList<>();
+
+            for (int i = 0; i < measurements.length(); i++) {
+                String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]);
+                String to = i == measurements.length() - 1 ? "" : 
String.valueOf(bounds[i]);
+
+                String name = new 
StringBuilder(from).append('_').append(to).toString();

Review Comment:
   Redundant `StringBuilder` instance.



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricRegistry.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.ignite.internal.util.FilteringIterator;
+import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Metric registry. Metrics source (see {@link MetricSource} must be 
registered in this metrics registry after initialization
+ * of corresponding component and must be unregistered in case of component is 
destroyed or stopped. Metrics registry also
+ * provides access to all enabled metrics through corresponding metrics sets. 
Metrics registry lifetime is equal to the node lifetime.
+ * <br>
+ * Implements an {@link Iterable} over the metric sets for enabled metric 
sources.
+ */
+public class MetricRegistry implements Iterable<MetricSet> {
+    private final Lock lock = new ReentrantLock();
+
+    /** Map of metric sources' names to tuples of registered sources with 
metric sets, if enabled. */
+    private volatile Map<String, IgniteBiTuple<MetricSource, MetricSet>> 
sources = new TreeMap<>();
+
+    /** Version always should be changed on metrics enabled/disabled action. */
+    private volatile long version;
+
+    /**
+     * Register metric source. It must be registered in this metrics registry 
after initialization of corresponding component
+     * and must be unregistered in case of component is destroyed or stopped, 
see {@link #unregisterSource(MetricSource)}.
+     * By registering, the metric source isn't enabled implicitly.
+     *
+     * @param src Metric source.
+     * @throws IllegalStateException If metric source with the given name 
already exists.
+     */
+    public void registerSource(MetricSource src) {
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> s = new 
IgniteBiTuple<>(src, null);
+
+            IgniteBiTuple<MetricSource, MetricSet> old = 
sources.putIfAbsent(src.name(), s);
+
+            if (old != null) {
+                throw new IllegalStateException("Metrics source with given 
name already exists: " + src.name());
+            }
+
+            // Now we sure that this metric source wasn't registered before.
+            assert !src.enabled() : "Metric source shouldn't be enabled before 
registration in registry.";

Review Comment:
   Why should not metric source enabled before registering? It is too strict 
limitation because the `MetricSource`'s contract doesn't prohibit this. 



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Distribution metric calculates counts of measurements that gets into each 
bounds interval.
+ * Note, that {@link #value()} will return array length of {@code 
bounds.length + 1}.
+ * Last element contains count of measurements bigger than most right value of 
bounds.
+ */
+public class DistributionMetric extends AbstractMetric implements 
CompositeMetric {
+    /** Count of measurement for each bound. */
+    private final AtomicLongArray measurements;
+
+    /** Bounds of measurements. */
+    private final long[] bounds;
+
+    /** List of scalar metrics. */
+    private volatile List<Metric> scalarMetrics = null;
+
+    /**
+     * The constructor.
+     *
+     * @param name Name.
+     * @param desc Description.
+     * @param bounds Bounds of the buckets.
+     */
+    public DistributionMetric(String name, @Nullable String desc, @NotNull 
long[] bounds) {
+        super(name, desc);
+
+        assert bounds != null && bounds.length > 0;
+        assert isSorted(bounds);
+
+        this.bounds = bounds;
+        this.measurements = new AtomicLongArray(bounds.length + 1);
+    }
+
+    /**
+     * Check whether given array is sorted.
+     *
+     * @param arr Array to check.
+     * @return {@code True} if array sorted, {@code false} otherwise.
+     */
+    private static boolean isSorted(@NotNull long[] arr) {
+        if (arr.length < 2) {
+            return true;
+        }
+
+        for (int i = 1; i < arr.length; i++) {
+            if (arr[i - 1] > arr[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Adds a value to the interval which the value belongs to.
+     *
+     * @param x Value.
+     */
+    public void add(long x) {
+        assert x >= 0;
+
+        //Expect arrays of few elements.
+        for (int i = 0; i < bounds.length; i++) {
+            if (x <= bounds[i]) {
+                measurements.incrementAndGet(i);
+
+                return;
+            }
+        }
+
+        measurements.incrementAndGet(bounds.length);
+    }
+
+    /** {@inheritDoc} */
+    public long[] value() {
+        long[] res = new long[measurements.length()];
+
+        for (int i = 0; i < measurements.length(); i++) {
+            res[i] = measurements.get(i);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable String getValueAsString() {
+        StringBuilder sb = new StringBuilder("[");
+
+        List<Metric> scalarMetrics = asScalarMetrics();
+
+        for (int i = 0; i < scalarMetrics.size(); i++) {
+            LongMetric m = (LongMetric) scalarMetrics.get(i);
+
+            sb.append(m.name())
+                    .append(':')
+                    .append(m.value());
+
+            if (i < scalarMetrics.size() - 1) {
+                sb.append(", ");
+            }
+        }
+
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    /**
+     * Bounds of the buckets of distribution.
+     *
+     * @return Bounds of the buckets of distribution.
+     */
+    public long[] bounds() {
+        return bounds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Metric> asScalarMetrics() {
+        if (scalarMetrics == null) {
+            List<Metric> metrics = new ArrayList<>();
+
+            for (int i = 0; i < measurements.length(); i++) {
+                String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]);

Review Comment:
   It seems that original implementation doesn't have 0-bucket. Please, recheck 
`MetricsUtils.histogramBucketNames` method in AI2.



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricRegistry.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.ignite.internal.util.FilteringIterator;
+import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Metric registry. Metrics source (see {@link MetricSource} must be 
registered in this metrics registry after initialization
+ * of corresponding component and must be unregistered in case of component is 
destroyed or stopped. Metrics registry also
+ * provides access to all enabled metrics through corresponding metrics sets. 
Metrics registry lifetime is equal to the node lifetime.
+ * <br>
+ * Implements an {@link Iterable} over the metric sets for enabled metric 
sources.
+ */
+public class MetricRegistry implements Iterable<MetricSet> {
+    private final Lock lock = new ReentrantLock();
+
+    /** Map of metric sources' names to tuples of registered sources with 
metric sets, if enabled. */
+    private volatile Map<String, IgniteBiTuple<MetricSource, MetricSet>> 
sources = new TreeMap<>();
+
+    /** Version always should be changed on metrics enabled/disabled action. */
+    private volatile long version;
+
+    /**
+     * Register metric source. It must be registered in this metrics registry 
after initialization of corresponding component
+     * and must be unregistered in case of component is destroyed or stopped, 
see {@link #unregisterSource(MetricSource)}.
+     * By registering, the metric source isn't enabled implicitly.
+     *
+     * @param src Metric source.
+     * @throws IllegalStateException If metric source with the given name 
already exists.
+     */
+    public void registerSource(MetricSource src) {
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> s = new 
IgniteBiTuple<>(src, null);
+
+            IgniteBiTuple<MetricSource, MetricSet> old = 
sources.putIfAbsent(src.name(), s);
+
+            if (old != null) {
+                throw new IllegalStateException("Metrics source with given 
name already exists: " + src.name());
+            }
+
+            // Now we sure that this metric source wasn't registered before.
+            assert !src.enabled() : "Metric source shouldn't be enabled before 
registration in registry.";
+
+            return true;
+        });
+    }
+
+    /**
+     * Unregister metric source. It must be unregistered in case of 
corresponding component is destroyed or stopped.
+     * Metric source is also disabled while unregistered, see {@link 
#disable(String)}.
+     *
+     * @param src Metric source.
+     */
+    public void unregisterSource(MetricSource src) {
+        unregisterSource(src.name());
+    }
+
+    /**
+     * Unregister metric source. It must be unregistered in case of 
corresponding component is destroyed or stopped.
+     * Metric source is also disabled while unregistered, see {@link 
#disable(String)}.
+     *
+     * @param srcName Metric source name.
+     */
+    public void unregisterSource(String srcName) {
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> s = sources.get(srcName);
+
+            if (s == null) {
+                return false;
+            }
+
+            assert s.get1() != null;
+
+            s.get1().disable();
+
+            sources.remove(srcName);
+
+            return true;
+        });
+    }
+
+    /**
+     * Enable metric set for the given metric source.
+     *
+     * @param src Metric source.
+     * @return Metric set, or {@code null} if the metric set is already 
enabled.
+     * @throws IllegalStateException If metric source isn't registered.
+     * @throws IllegalArgumentException If metric source isn't the same as 
registered.
+     */
+    public MetricSet enable(@NotNull MetricSource src) {
+        AtomicReference<MetricSet> metricSetRef = new AtomicReference<>();
+
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> registered = 
checkRegistered(sources, src);
+
+            if (registered.get2() != null) {
+                assert src.enabled();
+                return false;
+            }
+
+            MetricSet metricSet = src.enable();
+
+            assert metricSet != null;
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, metricSet);
+
+            sources.put(src.name(), updated);
+
+            metricSetRef.set(metricSet);
+
+            return true;
+        });
+
+        return metricSetRef.get();
+    }
+
+    /**
+     * Enable metric set for the given metric source.
+     *
+     * @param srcName Metric source name.
+     * @return Metric set, or {@code null} if the metric set is already 
enabled.
+     * @throws IllegalStateException If metric source with the given name 
doesn't exist.
+     */
+    public MetricSet enable(final String srcName) {
+        AtomicReference<MetricSet> metricSetRef = new AtomicReference<>();
+
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> registered = 
sources.get(srcName);
+
+            if (registered == null) {
+                throw new IllegalStateException("Metrics source with given 
name doesn't exist: " + srcName);
+            }
+
+            MetricSource src = registered.get1();
+
+            if (registered.get2() != null) {
+                assert src.enabled();
+                return false;
+            }
+
+            MetricSet metricSet = src.enable();
+            assert metricSet != null;
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, metricSet);
+
+            sources.put(src.name(), updated);
+
+            metricSetRef.set(metricSet);
+
+            return true;
+        });
+
+        return metricSetRef.get();
+    }
+
+    /**
+     * Disable metric set for the given metric source.
+     *
+     * @param src Metric source.
+     * @throws IllegalStateException If metric source isn't registered.
+     * @throws IllegalArgumentException If metric source isn't the same as 
registered.
+     */
+    public void disable(@NotNull MetricSource src) {
+        modifySources(sources -> {
+            checkRegistered(sources, src);
+
+            src.disable();
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, null);
+
+            sources.put(src.name(), updated);
+
+            return true;
+        });
+    }
+
+    /**
+     * Disable metric set for the given metric source.
+     *
+     * @param srcName Metric source name.
+     * @throws IllegalStateException If metric source with given name doesn't 
exists.
+     */
+    public void disable(final String srcName) {
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> registered = 
sources.get(srcName);
+
+            if (registered == null) {
+                throw new IllegalStateException("Metrics source with given 
name doesn't exists: " + srcName);
+            }
+
+            MetricSource src = registered.get1();
+
+            src.disable();
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, null);
+
+            sources.put(src.name(), updated);
+
+            return true;
+        });
+    }
+
+    /**
+     * Check that the given metric source is registered.
+     *
+     * @param sources Sources map.
+     * @param src Metric source.
+     * @return Registered pair of metric source and metric set.
+     * @throws IllegalStateException If metric source isn't registered.
+     * @throws IllegalArgumentException If metric source isn't the same as 
registered.
+     */
+    @NotNull
+    private IgniteBiTuple<MetricSource, MetricSet> checkRegistered(
+            Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources,
+            @NotNull MetricSource src
+    ) {
+        requireNonNull(src);
+
+        IgniteBiTuple<MetricSource, MetricSet> registered = 
sources.get(src.name());
+
+        if (registered == null) {
+            throw new IllegalStateException("Metrics source isn't registered: 
" + src.name());
+        }
+
+        if (!src.equals(registered.get1())) {
+            throw new IllegalArgumentException("Given metric source is not the 
same as registered by the same name: " + src.name());
+        }
+
+        return registered;
+    }
+
+    /**
+     * Updates {@link MetricRegistry#sources} map using copy-on-write 
principle. Increments version of registry.
+     *
+     * @param modifier Modifier for the sources map. Accepts the new, 
modifiable version of the map. Returns boolean value, whether
+     *                 the map was modified.
+     */
+    private void modifySources(Function<Map<String, 
IgniteBiTuple<MetricSource, MetricSet>>, Boolean> modifier) {
+        lock.lock();
+
+        try {
+            Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources0 = new 
TreeMap<>(sources);
+
+            boolean modified = modifier.apply(sources0);
+
+            if (modified) {
+                sources = sources0;
+
+                version++;

Review Comment:
   It is incorrect behavior because, for example, registering of metric source 
doesn't change the metrics schema until will not be enabled.
   
   The previous implementation was better because metrics are versioned while 
sources are not.



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricRegistry.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.ignite.internal.util.FilteringIterator;
+import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Metric registry. Metrics source (see {@link MetricSource} must be 
registered in this metrics registry after initialization
+ * of corresponding component and must be unregistered in case of component is 
destroyed or stopped. Metrics registry also
+ * provides access to all enabled metrics through corresponding metrics sets. 
Metrics registry lifetime is equal to the node lifetime.
+ * <br>
+ * Implements an {@link Iterable} over the metric sets for enabled metric 
sources.
+ */
+public class MetricRegistry implements Iterable<MetricSet> {
+    private final Lock lock = new ReentrantLock();
+
+    /** Map of metric sources' names to tuples of registered sources with 
metric sets, if enabled. */
+    private volatile Map<String, IgniteBiTuple<MetricSource, MetricSet>> 
sources = new TreeMap<>();
+
+    /** Version always should be changed on metrics enabled/disabled action. */
+    private volatile long version;
+
+    /**
+     * Register metric source. It must be registered in this metrics registry 
after initialization of corresponding component
+     * and must be unregistered in case of component is destroyed or stopped, 
see {@link #unregisterSource(MetricSource)}.
+     * By registering, the metric source isn't enabled implicitly.
+     *
+     * @param src Metric source.
+     * @throws IllegalStateException If metric source with the given name 
already exists.
+     */
+    public void registerSource(MetricSource src) {
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> s = new 
IgniteBiTuple<>(src, null);
+
+            IgniteBiTuple<MetricSource, MetricSet> old = 
sources.putIfAbsent(src.name(), s);
+
+            if (old != null) {
+                throw new IllegalStateException("Metrics source with given 
name already exists: " + src.name());
+            }
+
+            // Now we sure that this metric source wasn't registered before.
+            assert !src.enabled() : "Metric source shouldn't be enabled before 
registration in registry.";
+
+            return true;
+        });
+    }
+
+    /**
+     * Unregister metric source. It must be unregistered in case of 
corresponding component is destroyed or stopped.
+     * Metric source is also disabled while unregistered, see {@link 
#disable(String)}.
+     *
+     * @param src Metric source.
+     */
+    public void unregisterSource(MetricSource src) {
+        unregisterSource(src.name());
+    }
+
+    /**
+     * Unregister metric source. It must be unregistered in case of 
corresponding component is destroyed or stopped.
+     * Metric source is also disabled while unregistered, see {@link 
#disable(String)}.
+     *
+     * @param srcName Metric source name.
+     */
+    public void unregisterSource(String srcName) {
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> s = sources.get(srcName);
+
+            if (s == null) {
+                return false;
+            }
+
+            assert s.get1() != null;
+
+            s.get1().disable();
+
+            sources.remove(srcName);
+
+            return true;
+        });
+    }
+
+    /**
+     * Enable metric set for the given metric source.
+     *
+     * @param src Metric source.
+     * @return Metric set, or {@code null} if the metric set is already 
enabled.
+     * @throws IllegalStateException If metric source isn't registered.
+     * @throws IllegalArgumentException If metric source isn't the same as 
registered.
+     */
+    public MetricSet enable(@NotNull MetricSource src) {
+        AtomicReference<MetricSet> metricSetRef = new AtomicReference<>();
+
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> registered = 
checkRegistered(sources, src);
+
+            if (registered.get2() != null) {
+                assert src.enabled();
+                return false;
+            }
+
+            MetricSet metricSet = src.enable();
+
+            assert metricSet != null;
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, metricSet);
+
+            sources.put(src.name(), updated);
+
+            metricSetRef.set(metricSet);
+
+            return true;
+        });
+
+        return metricSetRef.get();
+    }
+
+    /**
+     * Enable metric set for the given metric source.
+     *
+     * @param srcName Metric source name.
+     * @return Metric set, or {@code null} if the metric set is already 
enabled.
+     * @throws IllegalStateException If metric source with the given name 
doesn't exist.
+     */
+    public MetricSet enable(final String srcName) {
+        AtomicReference<MetricSet> metricSetRef = new AtomicReference<>();
+
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> registered = 
sources.get(srcName);
+
+            if (registered == null) {
+                throw new IllegalStateException("Metrics source with given 
name doesn't exist: " + srcName);
+            }
+
+            MetricSource src = registered.get1();
+
+            if (registered.get2() != null) {
+                assert src.enabled();
+                return false;
+            }
+
+            MetricSet metricSet = src.enable();
+            assert metricSet != null;
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, metricSet);
+
+            sources.put(src.name(), updated);
+
+            metricSetRef.set(metricSet);
+
+            return true;
+        });
+
+        return metricSetRef.get();
+    }
+
+    /**
+     * Disable metric set for the given metric source.
+     *
+     * @param src Metric source.
+     * @throws IllegalStateException If metric source isn't registered.
+     * @throws IllegalArgumentException If metric source isn't the same as 
registered.
+     */
+    public void disable(@NotNull MetricSource src) {
+        modifySources(sources -> {
+            checkRegistered(sources, src);
+
+            src.disable();
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, null);
+
+            sources.put(src.name(), updated);
+
+            return true;
+        });
+    }
+
+    /**
+     * Disable metric set for the given metric source.
+     *
+     * @param srcName Metric source name.
+     * @throws IllegalStateException If metric source with given name doesn't 
exists.
+     */
+    public void disable(final String srcName) {
+        modifySources(sources -> {
+            IgniteBiTuple<MetricSource, MetricSet> registered = 
sources.get(srcName);
+
+            if (registered == null) {
+                throw new IllegalStateException("Metrics source with given 
name doesn't exists: " + srcName);
+            }
+
+            MetricSource src = registered.get1();
+
+            src.disable();
+
+            IgniteBiTuple<MetricSource, MetricSet> updated = new 
IgniteBiTuple<>(src, null);
+
+            sources.put(src.name(), updated);
+
+            return true;
+        });
+    }
+
+    /**
+     * Check that the given metric source is registered.
+     *
+     * @param sources Sources map.
+     * @param src Metric source.
+     * @return Registered pair of metric source and metric set.
+     * @throws IllegalStateException If metric source isn't registered.
+     * @throws IllegalArgumentException If metric source isn't the same as 
registered.
+     */
+    @NotNull
+    private IgniteBiTuple<MetricSource, MetricSet> checkRegistered(
+            Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources,
+            @NotNull MetricSource src
+    ) {
+        requireNonNull(src);
+
+        IgniteBiTuple<MetricSource, MetricSet> registered = 
sources.get(src.name());
+
+        if (registered == null) {
+            throw new IllegalStateException("Metrics source isn't registered: 
" + src.name());
+        }
+
+        if (!src.equals(registered.get1())) {
+            throw new IllegalArgumentException("Given metric source is not the 
same as registered by the same name: " + src.name());
+        }
+
+        return registered;
+    }
+
+    /**
+     * Updates {@link MetricRegistry#sources} map using copy-on-write 
principle. Increments version of registry.
+     *
+     * @param modifier Modifier for the sources map. Accepts the new, 
modifiable version of the map. Returns boolean value, whether
+     *                 the map was modified.
+     */
+    private void modifySources(Function<Map<String, 
IgniteBiTuple<MetricSource, MetricSet>>, Boolean> modifier) {
+        lock.lock();
+
+        try {
+            Map<String, IgniteBiTuple<MetricSource, MetricSet>> sources0 = new 
TreeMap<>(sources);
+
+            boolean modified = modifier.apply(sources0);
+
+            if (modified) {
+                sources = sources0;
+
+                version++;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Returns registry schema version.
+     *
+     * @return Version.
+     */
+    public long version() {
+        return version;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Iterator<MetricSet> iterator() {

Review Comment:
   An iterator doesn't provide repeatable behavior. You can't reuse iterator. A 
new iterator could be returned for new version. So we need to provide a method 
that returns version and map with metrics atomically.



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Distribution metric calculates counts of measurements that gets into each 
bounds interval.
+ * Note, that {@link #value()} will return array length of {@code 
bounds.length + 1}.
+ * Last element contains count of measurements bigger than most right value of 
bounds.
+ */
+public class DistributionMetric extends AbstractMetric implements 
CompositeMetric {
+    /** Count of measurement for each bound. */
+    private final AtomicLongArray measurements;
+
+    /** Bounds of measurements. */
+    private final long[] bounds;
+
+    /** List of scalar metrics. */
+    private volatile List<Metric> scalarMetrics = null;
+
+    /**
+     * The constructor.
+     *
+     * @param name Name.
+     * @param desc Description.
+     * @param bounds Bounds of the buckets.
+     */
+    public DistributionMetric(String name, @Nullable String desc, @NotNull 
long[] bounds) {
+        super(name, desc);
+
+        assert bounds != null && bounds.length > 0;
+        assert isSorted(bounds);
+
+        this.bounds = bounds;
+        this.measurements = new AtomicLongArray(bounds.length + 1);
+    }
+
+    /**
+     * Check whether given array is sorted.
+     *
+     * @param arr Array to check.
+     * @return {@code True} if array sorted, {@code false} otherwise.
+     */
+    private static boolean isSorted(@NotNull long[] arr) {
+        if (arr.length < 2) {
+            return true;
+        }
+
+        for (int i = 1; i < arr.length; i++) {
+            if (arr[i - 1] > arr[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Adds a value to the interval which the value belongs to.
+     *
+     * @param x Value.
+     */
+    public void add(long x) {
+        assert x >= 0;
+
+        //Expect arrays of few elements.
+        for (int i = 0; i < bounds.length; i++) {
+            if (x <= bounds[i]) {
+                measurements.incrementAndGet(i);
+
+                return;
+            }
+        }
+
+        measurements.incrementAndGet(bounds.length);
+    }
+
+    /** {@inheritDoc} */
+    public long[] value() {
+        long[] res = new long[measurements.length()];
+
+        for (int i = 0; i < measurements.length(); i++) {
+            res[i] = measurements.get(i);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable String getValueAsString() {
+        StringBuilder sb = new StringBuilder("[");
+
+        List<Metric> scalarMetrics = asScalarMetrics();
+
+        for (int i = 0; i < scalarMetrics.size(); i++) {
+            LongMetric m = (LongMetric) scalarMetrics.get(i);
+
+            sb.append(m.name())
+                    .append(':')
+                    .append(m.value());
+
+            if (i < scalarMetrics.size() - 1) {
+                sb.append(", ");
+            }
+        }
+
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    /**
+     * Bounds of the buckets of distribution.
+     *
+     * @return Bounds of the buckets of distribution.
+     */
+    public long[] bounds() {
+        return bounds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Metric> asScalarMetrics() {
+        if (scalarMetrics == null) {
+            List<Metric> metrics = new ArrayList<>();
+
+            for (int i = 0; i < measurements.length(); i++) {
+                String from = i == 0 ? "0" : String.valueOf(bounds[i - 1]);
+                String to = i == measurements.length() - 1 ? "" : 
String.valueOf(bounds[i]);
+
+                String name = new 
StringBuilder(from).append('_').append(to).toString();

Review Comment:
   See AI 2 `HistorgamMetric`. The name of the last range should end with `INF` 
or `Inf` or `Infinity`.



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Distribution metric calculates counts of measurements that gets into each 
bounds interval.
+ * Note, that {@link #value()} will return array length of {@code 
bounds.length + 1}.
+ * Last element contains count of measurements bigger than most right value of 
bounds.
+ */
+public class DistributionMetric extends AbstractMetric implements 
CompositeMetric {
+    /** Count of measurement for each bound. */
+    private final AtomicLongArray measurements;
+
+    /** Bounds of measurements. */
+    private final long[] bounds;
+
+    /** List of scalar metrics. */
+    private volatile List<Metric> scalarMetrics = null;
+
+    /**
+     * The constructor.
+     *
+     * @param name Name.
+     * @param desc Description.
+     * @param bounds Bounds of the buckets.
+     */
+    public DistributionMetric(String name, @Nullable String desc, @NotNull 
long[] bounds) {
+        super(name, desc);
+
+        assert bounds != null && bounds.length > 0;
+        assert isSorted(bounds);
+
+        this.bounds = bounds;
+        this.measurements = new AtomicLongArray(bounds.length + 1);
+    }
+
+    /**
+     * Check whether given array is sorted.
+     *
+     * @param arr Array to check.
+     * @return {@code True} if array sorted, {@code false} otherwise.
+     */
+    private static boolean isSorted(@NotNull long[] arr) {
+        if (arr.length < 2) {
+            return true;
+        }
+
+        for (int i = 1; i < arr.length; i++) {
+            if (arr[i - 1] > arr[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Adds a value to the interval which the value belongs to.
+     *
+     * @param x Value.
+     */
+    public void add(long x) {
+        assert x >= 0;
+
+        //Expect arrays of few elements.
+        for (int i = 0; i < bounds.length; i++) {
+            if (x <= bounds[i]) {
+                measurements.incrementAndGet(i);
+
+                return;
+            }
+        }
+
+        measurements.incrementAndGet(bounds.length);
+    }
+
+    /** {@inheritDoc} */
+    public long[] value() {
+        long[] res = new long[measurements.length()];
+
+        for (int i = 0; i < measurements.length(); i++) {
+            res[i] = measurements.get(i);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable String getValueAsString() {

Review Comment:
   Why is it annotated as `@Nullable`?



##########
modules/metrics/src/main/java/org/apache/ignite/internal/metrics/DistributionMetric.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Distribution metric calculates counts of measurements that gets into each 
bounds interval.
+ * Note, that {@link #value()} will return array length of {@code 
bounds.length + 1}.
+ * Last element contains count of measurements bigger than most right value of 
bounds.
+ */
+public class DistributionMetric extends AbstractMetric implements 
CompositeMetric {
+    /** Count of measurement for each bound. */
+    private final AtomicLongArray measurements;
+
+    /** Bounds of measurements. */
+    private final long[] bounds;
+
+    /** List of scalar metrics. */
+    private volatile List<Metric> scalarMetrics = null;
+
+    /**
+     * The constructor.
+     *
+     * @param name Name.
+     * @param desc Description.
+     * @param bounds Bounds of the buckets.
+     */
+    public DistributionMetric(String name, @Nullable String desc, @NotNull 
long[] bounds) {
+        super(name, desc);
+
+        assert bounds != null && bounds.length > 0;
+        assert isSorted(bounds);
+
+        this.bounds = bounds;
+        this.measurements = new AtomicLongArray(bounds.length + 1);
+    }
+
+    /**
+     * Check whether given array is sorted.
+     *
+     * @param arr Array to check.
+     * @return {@code True} if array sorted, {@code false} otherwise.
+     */
+    private static boolean isSorted(@NotNull long[] arr) {
+        if (arr.length < 2) {
+            return true;
+        }
+
+        for (int i = 1; i < arr.length; i++) {
+            if (arr[i - 1] > arr[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Adds a value to the interval which the value belongs to.
+     *
+     * @param x Value.
+     */
+    public void add(long x) {
+        assert x >= 0;
+
+        //Expect arrays of few elements.
+        for (int i = 0; i < bounds.length; i++) {
+            if (x <= bounds[i]) {
+                measurements.incrementAndGet(i);
+
+                return;
+            }
+        }
+
+        measurements.incrementAndGet(bounds.length);
+    }
+
+    /** {@inheritDoc} */
+    public long[] value() {
+        long[] res = new long[measurements.length()];
+
+        for (int i = 0; i < measurements.length(); i++) {
+            res[i] = measurements.get(i);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable String getValueAsString() {

Review Comment:
   Method annotation must be on the previous line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to