anton-vinogradov commented on a change in pull request #8159:
URL: https://github.com/apache/ignite/pull/8159#discussion_r474583229
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -17,30 +17,111 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ int range = jsonNode.get("range").asInt();
+
+ if (infinite) {
+ boolean error = true;
+ AtomicInteger cycle = new AtomicInteger();
+
+ log.info("Generating data in background...");
+
+ try {
+ while (active()) {
+ generateData(cacheName, range, (idx) -> idx + cycle.get(),
true, cycle.get() > 0);
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ cycle.incrementAndGet();
Review comment:
seems to be overcomplicated
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -17,30 +17,111 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ int range = jsonNode.get("range").asInt();
+
+ if (infinite) {
+ boolean error = true;
+ AtomicInteger cycle = new AtomicInteger();
+
+ log.info("Generating data in background...");
+
+ try {
+ while (active()) {
+ generateData(cacheName, range, (idx) -> idx + cycle.get(),
true, cycle.get() > 0);
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ cycle.incrementAndGet();
+ }
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ log.info("Background data generation finished.");
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ error = false;
+ }
+ catch (Throwable e) {
+ // The data streamer fails with an error on node stoppage
event before the termination.
+ if (X.hasCause(e, NodeStoppingException.class))
+ error = false;
+ else if (e instanceof Exception)
+ log.error("Failed to generate data in background.", e);
+ }
+ finally {
+ if (error)
+ markBroken();
+ else
+ markFinished(false);
Review comment:
Some architectural leak here.
The application should not be aware of shutdown hook.
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
##########
@@ -102,24 +107,28 @@ protected void markInitialized() {
inited = true;
}
- /**
- *
- */
- protected void markFinished() {
+ /** */
+ protected void markFinished(boolean removeShutdownHook) {
assert !finished;
assert !broken;
log.info(APP_FINISHED);
- removeShutdownHook();
+ if (removeShutdownHook)
+ removeShutdownHook();
finished = true;
}
+ /** */
Review comment:
AFAIK, single line javadocs allowed only for fields
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -17,30 +17,111 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ int range = jsonNode.get("range").asInt();
+
+ if (infinite) {
+ boolean error = true;
+ AtomicInteger cycle = new AtomicInteger();
Review comment:
newline required here because of different variable purpose
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -17,30 +17,111 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ int range = jsonNode.get("range").asInt();
+
+ if (infinite) {
+ boolean error = true;
+ AtomicInteger cycle = new AtomicInteger();
+
+ log.info("Generating data in background...");
+
+ try {
+ while (active()) {
+ generateData(cacheName, range, (idx) -> idx + cycle.get(),
true, cycle.get() > 0);
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ cycle.incrementAndGet();
+ }
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ log.info("Background data generation finished.");
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ error = false;
+ }
+ catch (Throwable e) {
+ // The data streamer fails with an error on node stoppage
event before the termination.
+ if (X.hasCause(e, NodeStoppingException.class))
+ error = false;
+ else if (e instanceof Exception)
+ log.error("Failed to generate data in background.", e);
+ }
+ finally {
+ if (error)
+ markBroken();
+ else
+ markFinished(false);
Review comment:
just do not catch such exceptions. App moll be marked as broken at
IgniteAwareApplication#start
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -17,30 +17,111 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ int range = jsonNode.get("range").asInt();
+
+ if (infinite) {
+ boolean error = true;
+ AtomicInteger cycle = new AtomicInteger();
+
+ log.info("Generating data in background...");
+
+ try {
+ while (active()) {
+ generateData(cacheName, range, (idx) -> idx + cycle.get(),
true, cycle.get() > 0);
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ cycle.incrementAndGet();
+ }
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ log.info("Background data generation finished.");
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ error = false;
+ }
+ catch (Throwable e) {
+ // The data streamer fails with an error on node stoppage
event before the termination.
+ if (X.hasCause(e, NodeStoppingException.class))
+ error = false;
+ else if (e instanceof Exception)
+ log.error("Failed to generate data in background.", e);
+ }
+ finally {
+ if (error)
+ markBroken();
+ else
+ markFinished(false);
}
}
+ else {
+ log.info("Generating data...");
+
+ generateData(cacheName, range, Function.identity(), false, false);
- markSyncExecutionComplete();
+ log.info("Data generation finished. Generated " + range + "
entries.");
+
+ markSyncExecutionComplete();
+ }
+ }
+
+ /** */
+ private void generateData(String cacheName, int range, Function<Integer,
Integer> supplier, boolean markInited,
+ boolean overwrite) {
+ long notifyTime = System.nanoTime();
+ int streamed = 0;
Review comment:
newline required here because of different variable purpose
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -17,30 +17,111 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ int range = jsonNode.get("range").asInt();
+
+ if (infinite) {
+ boolean error = true;
+ AtomicInteger cycle = new AtomicInteger();
+
+ log.info("Generating data in background...");
+
+ try {
+ while (active()) {
+ generateData(cacheName, range, (idx) -> idx + cycle.get(),
true, cycle.get() > 0);
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ cycle.incrementAndGet();
+ }
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ log.info("Background data generation finished.");
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ error = false;
+ }
+ catch (Throwable e) {
+ // The data streamer fails with an error on node stoppage
event before the termination.
+ if (X.hasCause(e, NodeStoppingException.class))
+ error = false;
+ else if (e instanceof Exception)
+ log.error("Failed to generate data in background.", e);
+ }
+ finally {
+ if (error)
+ markBroken();
+ else
+ markFinished(false);
}
}
+ else {
+ log.info("Generating data...");
+
+ generateData(cacheName, range, Function.identity(), false, false);
- markSyncExecutionComplete();
+ log.info("Data generation finished. Generated " + range + "
entries.");
+
+ markSyncExecutionComplete();
+ }
+ }
+
+ /** */
+ private void generateData(String cacheName, int range, Function<Integer,
Integer> supplier, boolean markInited,
+ boolean overwrite) {
+ long notifyTime = System.nanoTime();
+ int streamed = 0;
+
+ if (log.isDebugEnabled())
+ log.debug("Creating cache...");
+
+ ignite.getOrCreateCache(cacheName);
+
+ try (IgniteDataStreamer<Integer, Integer> streamer =
ignite.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(overwrite);
+
+ for (int i = 0; i < range && active(); i++) {
+ streamer.addData(i, supplier.apply(i));
Review comment:
any reason to spend time on transforming the value?
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -17,30 +17,111 @@
package org.apache.ignite.internal.ducktest.tests;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get("cacheName").asText();
+ boolean infinite = jsonNode.hasNonNull("infinite") &&
jsonNode.get("infinite").asBoolean();
+ int range = jsonNode.get("range").asInt();
+
+ if (infinite) {
+ boolean error = true;
+ AtomicInteger cycle = new AtomicInteger();
+
+ log.info("Generating data in background...");
+
+ try {
+ while (active()) {
+ generateData(cacheName, range, (idx) -> idx + cycle.get(),
true, cycle.get() > 0);
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ cycle.incrementAndGet();
+ }
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ log.info("Background data generation finished.");
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ error = false;
+ }
+ catch (Throwable e) {
+ // The data streamer fails with an error on node stoppage
event before the termination.
+ if (X.hasCause(e, NodeStoppingException.class))
+ error = false;
+ else if (e instanceof Exception)
+ log.error("Failed to generate data in background.", e);
+ }
+ finally {
+ if (error)
+ markBroken();
+ else
+ markFinished(false);
}
}
+ else {
+ log.info("Generating data...");
+
+ generateData(cacheName, range, Function.identity(), false, false);
- markSyncExecutionComplete();
+ log.info("Data generation finished. Generated " + range + "
entries.");
+
+ markSyncExecutionComplete();
+ }
+ }
+
+ /** */
+ private void generateData(String cacheName, int range, Function<Integer,
Integer> supplier, boolean markInited,
+ boolean overwrite) {
+ long notifyTime = System.nanoTime();
+ int streamed = 0;
+
+ if (log.isDebugEnabled())
+ log.debug("Creating cache...");
+
+ ignite.getOrCreateCache(cacheName);
+
+ try (IgniteDataStreamer<Integer, Integer> streamer =
ignite.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(overwrite);
+
+ for (int i = 0; i < range && active(); i++) {
+ streamer.addData(i, supplier.apply(i));
+
+ if (notifyTime + DATAGEN_NOTIFY_INTERVAL_NANO <
System.nanoTime() ||
+ i - streamed >= DATAGEN_NOTIFY_INTERVAL_AMOUNT) {
+ notifyTime = System.nanoTime();
+
+ if (markInited && !inited())
+ markInitialized();
Review comment:
Not sure I got how notification related to the initialization.
How about to relocate init to the loop(generate)?
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
##########
@@ -76,8 +76,11 @@ protected IgniteAwareApplication() {
else
log.info("Application already done [finished=" + finished + ",
broken=" + broken + "]");
+ log.info("Waiting for graceful termination...");
+
while (!finished && !broken) {
- log.info("Waiting for graceful termnation.");
+ if (log.isTraceEnabled())
+ log.trace("Waiting for graceful termination cycle...");
Review comment:
do we really need this additional logging?
##########
File path:
modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/DataGenerationApplication.java
##########
@@ -18,29 +18,139 @@
package org.apache.ignite.internal.ducktest.tests;
import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
*
*/
public class DataGenerationApplication extends IgniteAwareApplication {
+ /** Logger. */
+ protected static final Logger log =
LogManager.getLogger(DataGenerationApplication.class.getName());
+
+ /** */
+ private static final String PARAM_RANGE = "range";
+
+ /** */
+ private static final String PARAM_INFINITE = "infinite";
+
+ /** */
+ private static final String PARAM_CACHE_NAME = "cacheName";
+
+ /** */
+ private static final String PARAM_OPTIMIZED = "optimized";
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_NANO = 1500 * 1000000L;
+
+ /** */
+ private static final long DATAGEN_NOTIFY_INTERVAL_AMOUNT = 10_000;
+
+ /** */
+ private static final String WATCHEABLE_BEGIN_DATA_GEN_MSG = "Begin
generating data in background...";
+
/** {@inheritDoc} */
@Override protected void run(JsonNode jsonNode) {
- log.info("Creating cache...");
+ String cacheName = jsonNode.get(PARAM_CACHE_NAME).asText();
+ boolean infinite = jsonNode.hasNonNull(PARAM_INFINITE) &&
jsonNode.get(PARAM_INFINITE).asBoolean();
+ boolean optimized = !jsonNode.hasNonNull(PARAM_OPTIMIZED) ||
jsonNode.get(PARAM_OPTIMIZED).asBoolean();
+ int range = jsonNode.get(PARAM_RANGE).asInt();
+
+ if (infinite) {
+ Random rnd = new Random();
+ CountDownLatch exitLatch = new CountDownLatch(1);
+
+ Thread th = new Thread(() -> {
+ log.info(WATCHEABLE_BEGIN_DATA_GEN_MSG);
+
+ boolean error = false;
+
+ try {
+ while (!terminated())
+ generateData(cacheName, range, (idx) ->
rnd.nextInt(range), optimized);
+
+ log.info("Background data generation finished.");
+ }
+ catch (Exception e) {
+ if (!X.hasCause(e, NodeStoppingException.class)) {
+ error = true;
+
+ log.error("Failed to generate data in background.", e);
+ }
+ }
+ finally {
+ if (error)
+ markBroken();
+ else
+ markFinished();
+
+ exitLatch.countDown();
+ }
+
+ }, DataGenerationApplication.class.getName() + "_cacheLoader");
- IgniteCache<Integer, Integer> cache =
ignite.createCache(jsonNode.get("cacheName").asText());
+ th.start();
- try (IgniteDataStreamer<Integer, Integer> stmr =
ignite.dataStreamer(cache.getName())) {
- for (int i = 0; i < jsonNode.get("range").asInt(); i++) {
- stmr.addData(i, i);
+ markInitialized();
- if (i % 10_000 == 0)
- log.info("Streamed " + i + " entries");
+ try {
+ exitLatch.await();
}
+ catch (InterruptedException e) {
+ log.warn("Interrupted waiting for background loading.");
+ }
+ }
+ else {
+ log.info("Generating data...");
+
+ generateData(cacheName, range, Function.identity(), optimized);
+
+ log.info("Data generation finished. Generated " + range + "
entries.");
+
+ markSyncExecutionComplete();
}
+ }
+
+ /** */
+ private void generateData(String cacheName, int range, Function<Integer,
Integer> supplier, boolean optimized) {
+ long notifyTime = System.nanoTime();
+ int streamed = 0;
+
+ if(log.isDebugEnabled())
+ log.debug("Creating cache...");
- markSyncExecutionComplete();
+ IgniteCache<Integer, Integer> cache =
ignite.getOrCreateCache(cacheName);
+
+ try (IgniteDataStreamer<Integer, Integer> streamer =
ignite.dataStreamer(cacheName)) {
+ streamer.allowOverwrite(true);
Review comment:
If you need to continue streaming on topology change, just restart it.
I don't like the idea of changing the semantic on the fly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]