anton-vinogradov commented on a change in pull request #8159:
URL: https://github.com/apache/ignite/pull/8159#discussion_r475409323
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -20,27 +20,103 @@
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final int DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+ /** */
+ private static final int DELAYED_INITIALIZATION_AMOUNT = 10_000;
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ boolean optimized = !jsonNode.hasNonNull("optimized") ||
jsonNode.get("optimized").asBoolean();
+ int range = jsonNode.get("range").asInt();
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ if (infinite) {
+ log.info("Generating data in background...");
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ while (active()) {
+ generateData(cacheName, range, true, optimized, true);
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ // Delayed initialization for small data amount ( <
DELAYED_INITIALIZATION_AMOUNT ).
Review comment:
This makes behavior counter-intuitive.
The small range should not force warmup finish.
Range 10 (keys 0-9) and warmup 10M is a possible case for small memory
consumption checks.
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -20,27 +20,103 @@
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final int DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+ /** */
+ private static final int DELAYED_INITIALIZATION_AMOUNT = 10_000;
Review comment:
let it be a "warmup" option
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
##########
@@ -165,6 +173,16 @@ protected boolean terminated() {
return terminated;
}
+ /** */
Review comment:
one-line javadocs are not allowed for methods
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -20,27 +20,103 @@
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final int DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+ /** */
+ private static final int DELAYED_INITIALIZATION_AMOUNT = 10_000;
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ boolean optimized = !jsonNode.hasNonNull("optimized") ||
jsonNode.get("optimized").asBoolean();
+ int range = jsonNode.get("range").asInt();
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ if (infinite) {
+ log.info("Generating data in background...");
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ while (active()) {
+ generateData(cacheName, range, true, optimized, true);
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ // Delayed initialization for small data amount ( <
DELAYED_INITIALIZATION_AMOUNT ).
+ if (!inited())
+ markInitialized();
}
+
+ log.info("Background data generation finished.");
+
+ markFinished();
}
+ else {
+ log.info("Generating data...");
+
+ generateData(cacheName, range, false, optimized, false);
+
+ log.info("Data generation finished. Generated " + range + "
entries.");
+
+ markSyncExecutionComplete();
+ }
+ }
+
+ /** */
+ private void generateData(String cacheName, int range, boolean
delayedInit, boolean optimized, boolean overwrite) {
+ long notifyTime = System.nanoTime();
- markSyncExecutionComplete();
+ int streamed = 0;
+
+ if (log.isDebugEnabled())
+ log.debug("Creating cache...");
+
+ IgniteCache<Integer, Integer> cache =
ignite.getOrCreateCache(cacheName);
+
+ try (IgniteDataStreamer<Integer, Integer> streamer =
ignite.dataStreamer(cacheName)) {
Review comment:
no reason to create datastreamer if (when) cache.put is used
##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -63,143 +75,86 @@ def __init__(self, test_context):
super(DiscoveryTest, self).__init__(test_context=test_context)
self.zk_quorum = None
self.servers = None
+ self.loader = None
- def __start_zk_quorum(self):
- self.zk_quorum = ZookeeperService(self.test_context, 3)
-
- self.stage("Starting ZooKeeper quorum")
-
- self.zk_quorum.start()
+ @cluster(num_nodes=NUM_NODES)
+ @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+ kill_coordinator=[False, True],
+ nodes_to_kill=[1, 2],
+ with_load=[False, True])
+ def test_tcp(self, ignite_version, kill_coordinator, nodes_to_kill,
with_load):
+ """
+ Test nodes failure scenario with TcpDiscoverySpi.
+ """
+ config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator,
with_load)
- self.stage("ZooKeeper quorum started")
+ return self.__simulate_nodes_failure(ignite_version,
self.__properties(), None, config)
- @staticmethod
- def __properties(zookeeper_settings=None):
+ @cluster(num_nodes=NUM_NODES + 3)
+ @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+ kill_coordinator=[False, True],
+ nodes_to_kill=[1, 2],
+ with_load=[False, True])
+ def test_zk(self, ignite_version, kill_coordinator, nodes_to_kill,
with_load):
"""
- :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None,
TcpDiscoverySpi will be used.
- :return: Rendered node's properties.
+ Test node failure scenario with ZooKeeperSpi.
"""
- return Template(DiscoveryTest.CONFIG_TEMPLATE) \
-
.render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
- zookeeper_settings=zookeeper_settings)
+ config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator,
with_load)
- @staticmethod
- def __zk_properties(connection_string):
- return
DiscoveryTest.__properties(zookeeper_settings={'connection_string':
connection_string})
+ self.__start_zk_quorum()
+
+ properties = self.__zk_properties(self.zk_quorum.connection_string())
+ modules = ["zookeeper"]
+
+ return self.__simulate_nodes_failure(ignite_version, properties,
modules, config)
def setUp(self):
pass
def teardown(self):
- if self.zk_quorum:
- self.zk_quorum.stop()
+ if self.loader:
+ self.loader.stop()
if self.servers:
self.servers.stop()
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_not_coordinator_single(self, version):
- """
- Test single-node-failure scenario (not the coordinator) with
TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 1)
-
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_not_coordinator_two(self, version):
- """
- Test two-node-failure scenario (not the coordinator) with
TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 2)
-
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_coordinator(self, version):
- """
- Test coordinator-failure scenario with TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 0)
-
- @cluster(num_nodes=NUM_NODES + 3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_not_coordinator_single(self, version):
- """
- Test single node failure scenario (not the coordinator) with ZooKeeper.
- """
- self.__start_zk_quorum()
-
- return self.__simulate_nodes_failure(version,
self.__zk_properties(self.zk_quorum.connection_string()), 1)
-
- @cluster(num_nodes=NUM_NODES + 3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_not_coordinator_two(self, version):
- """
- Test two-node-failure scenario (not the coordinator) with ZooKeeper.
- """
- self.__start_zk_quorum()
-
- return self.__simulate_nodes_failure(version,
self.__zk_properties(self.zk_quorum.connection_string()), 2)
-
- @cluster(num_nodes=NUM_NODES+3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_coordinator(self, version):
- """
- Test coordinator-failure scenario with ZooKeeper.
- """
- self.__start_zk_quorum()
+ if self.zk_quorum:
+ self.zk_quorum.stop()
- return self.__simulate_nodes_failure(version,
self.__zk_properties(self.zk_quorum.connection_string()), 0)
+ def __simulate_nodes_failure(self, version, properties, modules, config):
+ if config.nodes_to_kill == 0:
+ return {"No nodes to kill": "Nothing to do"}
- def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
- """
- :param nodes_to_kill: How many nodes to kill. If <1, the coordinator
is the choice. Otherwise: not-coordinator
- nodes of given number.
- """
self.servers = IgniteService(
self.test_context,
- num_nodes=self.NUM_NODES,
- modules=["zookeeper"],
+ num_nodes=self.NUM_NODES - 1,
+ modules=modules,
properties=properties,
version=version)
- self.stage("Starting ignite cluster")
-
time_holder = self.monotonic()
self.servers.start()
- if nodes_to_kill > self.servers.num_nodes - 1:
- raise Exception("Too many nodes to kill: " + str(nodes_to_kill))
-
data = {'Ignite cluster start time (s)': round(self.monotonic() -
time_holder, 1)}
- self.stage("Topology is ready")
- failed_nodes, survived_node = self.__choose_node_to_kill(nodes_to_kill)
+ failed_nodes, survived_node =
self.__choose_node_to_kill(config.kill_coordinator, config.nodes_to_kill)
ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
- self.stage("Stopping " + str(len(failed_nodes)) + " nodes.")
+ if config.with_load:
+ self.__start_loading(version, properties, modules)
first_terminated = self.servers.stop_nodes_async(failed_nodes,
clean_shutdown=False, wait_for_stop=False)
- self.stage("Waiting for failure detection of " +
str(len(failed_nodes)) + " nodes.")
-
# Keeps dates of logged node failures.
logged_timestamps = []
for failed_id in ids_to_wait:
- self.servers.await_event_on_node(self.__failed_pattern(failed_id),
survived_node, 10,
+ self.servers.await_event_on_node(self.__failed_pattern(failed_id),
survived_node, 20,
from_the_beginning=True,
backoff_sec=0.01)
# Save mono of last detected failure.
time_holder = self.monotonic()
Review comment:
how about to rename to last_blabla (as opposed to first_terminated)?
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -20,27 +20,103 @@
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final int DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+ /** */
+ private static final int DELAYED_INITIALIZATION_AMOUNT = 10_000;
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ boolean optimized = !jsonNode.hasNonNull("optimized") ||
jsonNode.get("optimized").asBoolean();
Review comment:
how about "batched" or something like that?
##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -63,143 +75,86 @@ def __init__(self, test_context):
super(DiscoveryTest, self).__init__(test_context=test_context)
self.zk_quorum = None
self.servers = None
+ self.loader = None
- def __start_zk_quorum(self):
- self.zk_quorum = ZookeeperService(self.test_context, 3)
-
- self.stage("Starting ZooKeeper quorum")
-
- self.zk_quorum.start()
+ @cluster(num_nodes=NUM_NODES)
+ @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+ kill_coordinator=[False, True],
+ nodes_to_kill=[0, 1, 2],
+ with_load=[False, True])
+ def test_tcp(self, ignite_version, kill_coordinator, nodes_to_kill,
with_load):
+ """
+ Test nodes failure scenario with TcpDiscoverySpi.
+ """
+ config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator,
with_load)
- self.stage("ZooKeeper quorum started")
+ return self.__simulate_nodes_failure(ignite_version,
self.__properties(), None, config)
- @staticmethod
- def __properties(zookeeper_settings=None):
+ @cluster(num_nodes=NUM_NODES + 3)
+ @matrix(ignite_version=[str(DEV_BRANCH), str(LATEST_2_8)],
+ kill_coordinator=[False, True],
+ nodes_to_kill=[0, 1, 2],
+ with_load=[False, True])
+ def test_zk(self, ignite_version, kill_coordinator, nodes_to_kill,
with_load):
"""
- :param zookeeper_settings: ZooKeeperDiscoverySpi settings. If None,
TcpDiscoverySpi will be used.
- :return: Rendered node's properties.
+ Test node failure scenario with ZooKeeperSpi.
"""
- return Template(DiscoveryTest.CONFIG_TEMPLATE) \
-
.render(failure_detection_timeout=DiscoveryTest.FAILURE_DETECTION_TIMEOUT,
- zookeeper_settings=zookeeper_settings)
+ config = DiscoveryTest.Config(nodes_to_kill, kill_coordinator,
with_load)
- @staticmethod
- def __zk_properties(connection_string):
- return
DiscoveryTest.__properties(zookeeper_settings={'connection_string':
connection_string})
+ self.__start_zk_quorum()
+
+ properties = self.__zk_properties(self.zk_quorum.connection_string())
+ modules = ["zookeeper"]
+
+ return self.__simulate_nodes_failure(ignite_version, properties,
modules, config)
def setUp(self):
pass
def teardown(self):
- if self.zk_quorum:
- self.zk_quorum.stop()
+ if self.loader:
+ self.loader.stop()
if self.servers:
self.servers.stop()
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_not_coordinator_single(self, version):
- """
- Test single-node-failure scenario (not the coordinator) with
TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 1)
-
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_not_coordinator_two(self, version):
- """
- Test two-node-failure scenario (not the coordinator) with
TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 2)
-
- @cluster(num_nodes=NUM_NODES)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_tcp_coordinator(self, version):
- """
- Test coordinator-failure scenario with TcpDiscoverySpi.
- """
- return self.__simulate_nodes_failure(version, self.__properties(), 0)
-
- @cluster(num_nodes=NUM_NODES + 3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_not_coordinator_single(self, version):
- """
- Test single node failure scenario (not the coordinator) with ZooKeeper.
- """
- self.__start_zk_quorum()
-
- return self.__simulate_nodes_failure(version,
self.__zk_properties(self.zk_quorum.connection_string()), 1)
-
- @cluster(num_nodes=NUM_NODES + 3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_not_coordinator_two(self, version):
- """
- Test two-node-failure scenario (not the coordinator) with ZooKeeper.
- """
- self.__start_zk_quorum()
-
- return self.__simulate_nodes_failure(version,
self.__zk_properties(self.zk_quorum.connection_string()), 2)
-
- @cluster(num_nodes=NUM_NODES+3)
- @parametrize(version=str(DEV_BRANCH))
- @parametrize(version=str(LATEST_2_7))
- def test_zk_coordinator(self, version):
- """
- Test coordinator-failure scenario with ZooKeeper.
- """
- self.__start_zk_quorum()
+ if self.zk_quorum:
+ self.zk_quorum.stop()
- return self.__simulate_nodes_failure(version,
self.__zk_properties(self.zk_quorum.connection_string()), 0)
+ def __simulate_nodes_failure(self, version, properties, modules, config):
+ if config.nodes_to_kill == 0 and not config.kill_coordinator:
+ return {"No nodes to kill": "Nothing to do"}
- def __simulate_nodes_failure(self, version, properties, nodes_to_kill=1):
- """
- :param nodes_to_kill: How many nodes to kill. If <1, the coordinator
is the choice. Otherwise: not-coordinator
- nodes of given number.
- """
self.servers = IgniteService(
self.test_context,
- num_nodes=self.NUM_NODES,
- modules=["zookeeper"],
+ num_nodes=self.NUM_NODES - 1,
+ modules=modules,
properties=properties,
version=version)
- self.stage("Starting ignite cluster")
-
time_holder = self.monotonic()
self.servers.start()
- if nodes_to_kill > self.servers.num_nodes - 1:
- raise Exception("Too many nodes to kill: " + str(nodes_to_kill))
-
data = {'Ignite cluster start time (s)': round(self.monotonic() -
time_holder, 1)}
- self.stage("Topology is ready")
- failed_nodes, survived_node = self.__choose_node_to_kill(nodes_to_kill)
+ failed_nodes, survived_node =
self.__choose_node_to_kill(config.kill_coordinator, config.nodes_to_kill)
ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
- self.stage("Stopping " + str(len(failed_nodes)) + " nodes.")
+ if config.with_load:
+ self.__start_loading(version, properties, modules)
first_terminated = self.servers.stop_nodes_async(failed_nodes,
clean_shutdown=False, wait_for_stop=False)
- self.stage("Waiting for failure detection of " +
str(len(failed_nodes)) + " nodes.")
-
# Keeps dates of logged node failures.
logged_timestamps = []
for failed_id in ids_to_wait:
- self.servers.await_event_on_node(self.__failed_pattern(failed_id),
survived_node, 10,
+ self.servers.await_event_on_node(self.__failed_pattern(failed_id),
survived_node, 20,
Review comment:
does it really make sense to have backoff_sec=0.01?
##########
File path: modules/ducktests/tests/ignitetest/tests/discovery_test.py
##########
@@ -210,37 +165,75 @@ def __simulate_nodes_failure(self, version, properties,
nodes_to_kill=1):
logged_timestamps.sort(reverse=True)
- # Failure detection delay.
- time_holder = int((time_holder - first_terminated[0]) * 1000)
- # Failure detection delay by log.
- by_log = epoch_mills(logged_timestamps[0]) -
epoch_mills(first_terminated[1])
+ self.__check_and_store_results(data, int((time_holder -
first_terminated[0]) * 1000),
+ epoch_mills(logged_timestamps[0]) -
epoch_mills(first_terminated[1]))
Review comment:
how about also provide raw list here? duration1 and duration2.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]