ivandasch commented on a change in pull request #9142: URL: https://github.com/apache/ignite/pull/9142#discussion_r656014060
########## File path: modules/ducktests/tests/ignitetest/tests/rebalance/util.py ########## @@ -0,0 +1,290 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utils for rebalanced tests. +""" + +import sys +from enum import IntEnum +from typing import NamedTuple + +# pylint: disable=W0622 +from ducktape.errors import TimeoutError + +# pylint: disable=too-many-arguments +from ignitetest.services.ignite import IgniteService +from ignitetest.services.ignite_app import IgniteApplicationService +from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration +from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration +from ignitetest.utils.enum import constructible +from ignitetest.utils.version import IgniteVersion + +NUM_NODES = 4 +DEFAULT_DATA_REGION_SZ = 1 << 30 + + +@constructible +class TriggerEvent(IntEnum): + """ + Rebalance trigger event. + """ + NODE_JOIN = 0 + NODE_LEFT = 1 + + +# pylint: disable=R0914 +def preload_data(context, config, preloaders, backups, cache_count, entry_count, entry_size, timeout=3600): + """ + Puts entry_count of key-value pairs of entry_size bytes to cache_count caches. + :param context: Test context. + :param config: Ignite configuration. + :param preloaders: Preload client nodes count. + :param backups: Cache backups count. + :param cache_count: Cache count. + :param entry_count: Cache entry count. + :param entry_size: Entry size in bytes. + :param timeout: Timeout in seconds for application finished. + :return: Time taken for data preloading. + """ + assert preloaders > 0 + assert cache_count > 0 + assert entry_count > 0 + assert entry_size > 0 + + apps = [] + + def start_app(from_, to_): + app0 = IgniteApplicationService( + context, + config=config, + java_class_name="org.apache.ignite.internal.ducktest.tests.rebalance.DataGenerationApplication", + params={ + "backups": backups, + "cacheCount": cache_count, + "entrySize": entry_size, + "from": from_, + "to": to_ + }, + shutdown_timeout_sec=timeout) + app0.start_async() + + apps.append(app0) + + count = int(entry_count / preloaders) + _from = 0 + _to = 0 + + for _ in range(preloaders - 1): + _from = _to + _to += count + start_app(_from, _to) + + start_app(_to, entry_count) + + for app1 in apps: + app1.await_stopped() + + return (max(map(lambda app: app.get_finish_time(), apps)) - + min(map(lambda app: app.get_init_time(), apps))).total_seconds() + + +def await_rebalance_start(service: IgniteService, timeout: int = 30): + """ + Awaits rebalance starting on any test-cache on any node. + :param service: IgniteService in which rebalance start will be awaited. + :param timeout: Rebalance start await timeout. + :return: dictionary of two keypairs with keys "node" and "time", + where "node" contains the first node on which rebalance start was detected + and "time" contains the time when rebalance was started. + """ + for node in service.alive_nodes: + try: + rebalance_start_time = service.get_event_time_on_node( + node, + "Starting rebalance routine", + timeout=timeout) + except TimeoutError: + continue + else: + return rebalance_start_time + + raise RuntimeError("Rebalance start was not detected on any node") + + +def get_rebalance_metrics(node, cache_group): + """ + Gets rebalance metrics for specified node and cache group. + :param node: Ignite node. + :param cache_group: Cache group. + :return: RebalanceMetrics instance. + """ + mbean = node.jmx_client().find_mbean('.*group=cacheGroups.*name="%s"' % cache_group) + start_time = int(next(mbean.RebalancingStartTime)) + end_time = int(next(mbean.RebalancingEndTime)) + + return RebalanceMetrics( + received_bytes=int(next(mbean.RebalancingReceivedBytes)), + start_time=start_time, + end_time=end_time, + duration=(end_time - start_time) if start_time != -1 and end_time != -1 else 0, + node=node.name) + + +class RebalanceMetrics(NamedTuple): + """ + Rebalance metrics + """ + received_bytes: int = 0 + start_time: int = 0 + end_time: int = 0 + duration: int = 0 + node: str = None + + +def aggregate_rebalance_stats(nodes, cache_count): + """ + Aggregates rebalance stats for specified nodes and cache count. + :param nodes: Nodes list. + :param cache_count: Cache count. + :return: Aggregated rebalance stats dictionary. + """ + def __stats(cache_idx): + cache_name = "test-cache-%d" % (cache_idx + 1) + + stats = { + "cache": cache_name, + "start_time": {}, + "end_time": {}, + "duration": {}, + "received_bytes": {} + } + + metrics = list(map(lambda node: get_rebalance_metrics(node, cache_name), nodes)) + + def __key(tup): + return tup[1] + Review comment: Lets rewrite it in loop: ```python from itertools import product, chain for prop, mode in chain(product(['start_time', 'end_time'],['min', 'max']), product(['duration','received_bytes'], ['min', 'max', 'sum'])): if mode == 'sum': val = sum(map(lambda item: getattr(item, prop))) else: val = map(lambda item: (item.node, getattr(item, prop)), metrics val = min(val, key=__key) if mode == 'min' else max(val, key=__key) stats[prop][mode] = val ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
