anton-vinogradov commented on a change in pull request #8294: URL: https://github.com/apache/ignite/pull/8294#discussion_r507623963
########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; Review comment: cores? threads? ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("UuidDataStreamer-%d") + .setDaemon(true) + .build(); + + ExecutorService executors = Executors.newFixedThreadPool(core, threadFactory); + CountDownLatch latch = new CountDownLatch(core); + + for (int i = 0; i < core; i++) + executors.submit(new UuidDataStreamer(ignite, cacheName, latch, iterCore, dataSize)); Review comment: any reason to use executor service instead of threads? ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("UuidDataStreamer-%d") + .setDaemon(true) + .build(); + + ExecutorService executors = Executors.newFixedThreadPool(core, threadFactory); + CountDownLatch latch = new CountDownLatch(core); + + for (int i = 0; i < core; i++) + executors.submit(new UuidDataStreamer(ignite, cacheName, latch, iterCore, dataSize)); + + try { + while (true) { + if (latch.await(1, TimeUnit.SECONDS) || terminated()) + break; + } + } + catch (InterruptedException e) { + markBroken(new RuntimeException("Unexpected thread interruption", e)); + + Thread.currentThread().interrupt(); + } Review comment: why so overcomplicated instead of just latch.await()? ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("UuidDataStreamer-%d") + .setDaemon(true) + .build(); + + ExecutorService executors = Executors.newFixedThreadPool(core, threadFactory); Review comment: why executorS? ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("UuidDataStreamer-%d") + .setDaemon(true) + .build(); + + ExecutorService executors = Executors.newFixedThreadPool(core, threadFactory); + CountDownLatch latch = new CountDownLatch(core); + + for (int i = 0; i < core; i++) + executors.submit(new UuidDataStreamer(ignite, cacheName, latch, iterCore, dataSize)); + + try { + while (true) { + if (latch.await(1, TimeUnit.SECONDS) || terminated()) + break; + } + } + catch (InterruptedException e) { + markBroken(new RuntimeException("Unexpected thread interruption", e)); + + Thread.currentThread().interrupt(); + } + finally { + executors.shutdownNow(); + } + } + + /** */ + private class UuidDataStreamer implements Runnable { Review comment: Why not static? ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; Review comment: 1) why reversed iterSize > 0? 2) should we just return if iterSize <= 0? ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("UuidDataStreamer-%d") + .setDaemon(true) + .build(); + + ExecutorService executors = Executors.newFixedThreadPool(core, threadFactory); + CountDownLatch latch = new CountDownLatch(core); + + for (int i = 0; i < core; i++) + executors.submit(new UuidDataStreamer(ignite, cacheName, latch, iterCore, dataSize)); + + try { + while (true) { + if (latch.await(1, TimeUnit.SECONDS) || terminated()) + break; + } + } + catch (InterruptedException e) { + markBroken(new RuntimeException("Unexpected thread interruption", e)); + + Thread.currentThread().interrupt(); + } + finally { + executors.shutdownNow(); + } + } + + /** */ + private class UuidDataStreamer implements Runnable { + /** Ignite. */ + private final Ignite ignite; + + /** Cache name. */ + private final String cacheName; + + /** Latch. */ + private final CountDownLatch latch; + + /** Iteration size. */ + private final long iterSize; + + /** Data size. */ + private final int dataSize; + + /** */ + public UuidDataStreamer(Ignite ignite, String cacheName, CountDownLatch latch, long iterSize, int dataSize) { + this.ignite = ignite; + this.cacheName = cacheName; + this.latch = latch; + this.iterSize = iterSize; + this.dataSize = dataSize; + } + + /** {@inheritDoc} */ + @Override public void run() { + long cnt = 0L; + + try (IgniteDataStreamer<UUID, byte[]> dataStreamer = ignite.dataStreamer(cacheName)) { + dataStreamer.autoFlushFrequency(100L); Review comment: what the reason to specify this? ########## File path: modules/ducktests/tests/ignitetest/services/utils/control_utility.py ########## @@ -127,6 +132,106 @@ def tx_kill(self, **kwargs): res = self.__parse_tx_list(output) return res if res else output + def validate_indexes(self, check_assert: bool = None): + """ + Validate indexes. + If check is true, will be return + """ + data = self.__run("--cache validate_indexes") + + if check_assert is not None: + assert (('no issues found.' in data) == check_assert), data + + return data Review comment: any reason to have this return? ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("UuidDataStreamer-%d") + .setDaemon(true) + .build(); + + ExecutorService executors = Executors.newFixedThreadPool(core, threadFactory); + CountDownLatch latch = new CountDownLatch(core); + + for (int i = 0; i < core; i++) + executors.submit(new UuidDataStreamer(ignite, cacheName, latch, iterCore, dataSize)); + + try { + while (true) { + if (latch.await(1, TimeUnit.SECONDS) || terminated()) + break; + } + } + catch (InterruptedException e) { + markBroken(new RuntimeException("Unexpected thread interruption", e)); + + Thread.currentThread().interrupt(); + } + finally { + executors.shutdownNow(); + } + } + + /** */ + private class UuidDataStreamer implements Runnable { + /** Ignite. */ + private final Ignite ignite; + + /** Cache name. */ + private final String cacheName; + + /** Latch. */ + private final CountDownLatch latch; + + /** Iteration size. */ + private final long iterSize; + + /** Data size. */ + private final int dataSize; + + /** */ + public UuidDataStreamer(Ignite ignite, String cacheName, CountDownLatch latch, long iterSize, int dataSize) { + this.ignite = ignite; + this.cacheName = cacheName; + this.latch = latch; + this.iterSize = iterSize; + this.dataSize = dataSize; + } + + /** {@inheritDoc} */ + @Override public void run() { + long cnt = 0L; + + try (IgniteDataStreamer<UUID, byte[]> dataStreamer = ignite.dataStreamer(cacheName)) { + dataStreamer.autoFlushFrequency(100L); + + while (cnt != iterSize && !Thread.currentThread().isInterrupted()) { + UUID uuid = UUID.randomUUID(); + + byte[] data = new byte[dataSize]; + + ThreadLocalRandom.current().nextBytes(data); + + dataStreamer.addData(uuid, data); + + cnt++; + } + + dataStreamer.flush(); Review comment: Will ds do this automatically on close? ########## File path: modules/ducktests/tests/ignitetest/services/utils/control_utility.py ########## @@ -127,6 +132,106 @@ def tx_kill(self, **kwargs): res = self.__parse_tx_list(output) return res if res else output + def validate_indexes(self, check_assert: bool = None): + """ + Validate indexes. + If check is true, will be return + """ + data = self.__run("--cache validate_indexes") + + if check_assert is not None: + assert (('no issues found.' in data) == check_assert), data + + return data + + def idle_verify(self, check_assert: bool = None): + """ + Idle verify. + """ + data = self.__run("--cache idle_verify") + + if check_assert is not None: + assert (('idle_verify check has finished, no conflicts have been found.' in data) == check_assert), data + + return data + + def idle_verify_dump(self, node=None, return_path: bool = False): + """ + Idle verify dump. + """ + data = self.__run("--cache idle_verify --dump", node=node) + + if return_path & ('VisorIdleVerifyDumpTask successfully' in data): + match = re.search(r'/.*.txt', data) + return match[0] + + return data + + def snapshot_create(self, snapshot_name: str, sync_mode: bool = True, time_out: int = 60): + """ + Create snapshot. + """ + res = self.__run(f"--snapshot create {snapshot_name}") + + self.logger.info(res) + + if ("Command [SNAPSHOT] finished with code: 0" in res) & sync_mode: + self.await_snapshot(snapshot_name=snapshot_name, time_out=time_out) + + return res Review comment: any reason to have this return? ########## File path: modules/ducktests/tests/ignitetest/services/utils/control_utility.py ########## @@ -127,6 +132,106 @@ def tx_kill(self, **kwargs): res = self.__parse_tx_list(output) return res if res else output + def validate_indexes(self, check_assert: bool = None): + """ + Validate indexes. + If check is true, will be return + """ + data = self.__run("--cache validate_indexes") + + if check_assert is not None: + assert (('no issues found.' in data) == check_assert), data + + return data + + def idle_verify(self, check_assert: bool = None): + """ + Idle verify. + """ + data = self.__run("--cache idle_verify") + + if check_assert is not None: + assert (('idle_verify check has finished, no conflicts have been found.' in data) == check_assert), data + + return data Review comment: any reason to have this return? ########## File path: modules/ducktests/tests/ignitetest/services/utils/control_utility.py ########## @@ -127,6 +132,106 @@ def tx_kill(self, **kwargs): res = self.__parse_tx_list(output) return res if res else output + def validate_indexes(self, check_assert: bool = None): + """ + Validate indexes. + If check is true, will be return + """ + data = self.__run("--cache validate_indexes") + + if check_assert is not None: + assert (('no issues found.' in data) == check_assert), data + + return data + + def idle_verify(self, check_assert: bool = None): + """ + Idle verify. + """ + data = self.__run("--cache idle_verify") + + if check_assert is not None: + assert (('idle_verify check has finished, no conflicts have been found.' in data) == check_assert), data + + return data + + def idle_verify_dump(self, node=None, return_path: bool = False): + """ + Idle verify dump. + """ + data = self.__run("--cache idle_verify --dump", node=node) + + if return_path & ('VisorIdleVerifyDumpTask successfully' in data): + match = re.search(r'/.*.txt', data) + return match[0] + + return data + + def snapshot_create(self, snapshot_name: str, sync_mode: bool = True, time_out: int = 60): + """ + Create snapshot. + """ + res = self.__run(f"--snapshot create {snapshot_name}") + + self.logger.info(res) + + if ("Command [SNAPSHOT] finished with code: 0" in res) & sync_mode: + self.await_snapshot(snapshot_name=snapshot_name, time_out=time_out) + + return res + + def snapshot_cancel(self, snapshot_name: str): Review comment: no usages found ########## File path: modules/ducktests/tests/ignitetest/services/utils/control_utility.py ########## @@ -127,6 +132,106 @@ def tx_kill(self, **kwargs): res = self.__parse_tx_list(output) return res if res else output + def validate_indexes(self, check_assert: bool = None): + """ + Validate indexes. + If check is true, will be return + """ + data = self.__run("--cache validate_indexes") + + if check_assert is not None: + assert (('no issues found.' in data) == check_assert), data + + return data + + def idle_verify(self, check_assert: bool = None): + """ + Idle verify. + """ + data = self.__run("--cache idle_verify") + + if check_assert is not None: + assert (('idle_verify check has finished, no conflicts have been found.' in data) == check_assert), data + + return data + + def idle_verify_dump(self, node=None, return_path: bool = False): + """ + Idle verify dump. + """ + data = self.__run("--cache idle_verify --dump", node=node) + + if return_path & ('VisorIdleVerifyDumpTask successfully' in data): + match = re.search(r'/.*.txt', data) + return match[0] + + return data + + def snapshot_create(self, snapshot_name: str, sync_mode: bool = True, time_out: int = 60): + """ + Create snapshot. + """ + res = self.__run(f"--snapshot create {snapshot_name}") + + self.logger.info(res) + + if ("Command [SNAPSHOT] finished with code: 0" in res) & sync_mode: + self.await_snapshot(snapshot_name=snapshot_name, time_out=time_out) + + return res + + def snapshot_cancel(self, snapshot_name: str): + """ + Cancel snapshot. + """ + return self.__run(f"--snapshot cancel {snapshot_name}") + + def snapshot_kill(self, snapshot_name: str): Review comment: no usages found ########## File path: modules/ducktests/tests/ignitetest/services/utils/control_utility.py ########## @@ -127,6 +132,106 @@ def tx_kill(self, **kwargs): res = self.__parse_tx_list(output) return res if res else output + def validate_indexes(self, check_assert: bool = None): + """ + Validate indexes. + If check is true, will be return + """ + data = self.__run("--cache validate_indexes") + + if check_assert is not None: + assert (('no issues found.' in data) == check_assert), data + + return data + + def idle_verify(self, check_assert: bool = None): + """ + Idle verify. + """ + data = self.__run("--cache idle_verify") + + if check_assert is not None: + assert (('idle_verify check has finished, no conflicts have been found.' in data) == check_assert), data + + return data + + def idle_verify_dump(self, node=None, return_path: bool = False): + """ + Idle verify dump. + """ + data = self.__run("--cache idle_verify --dump", node=node) + + if return_path & ('VisorIdleVerifyDumpTask successfully' in data): + match = re.search(r'/.*.txt', data) + return match[0] + + return data + + def snapshot_create(self, snapshot_name: str, sync_mode: bool = True, time_out: int = 60): + """ + Create snapshot. + """ + res = self.__run(f"--snapshot create {snapshot_name}") + + self.logger.info(res) + + if ("Command [SNAPSHOT] finished with code: 0" in res) & sync_mode: + self.await_snapshot(snapshot_name=snapshot_name, time_out=time_out) + + return res + + def snapshot_cancel(self, snapshot_name: str): + """ + Cancel snapshot. + """ + return self.__run(f"--snapshot cancel {snapshot_name}") + + def snapshot_kill(self, snapshot_name: str): + """ + Kill snapshot. + """ + return self.__run(f"--kill SNAPSHOT {snapshot_name}") + + def await_snapshot(self, snapshot_name: str, time_out=60): + """ + Waiting for the snapshot to complete. + """ + delta_time = datetime.now() + timedelta(seconds=time_out) + + while datetime.now() < delta_time: + for node in self._cluster.nodes: + mbean = JmxClient(node).find_mbean('snapshot') + start_time = int(list(mbean.__getattr__('LastSnapshotStartTime'))[0]) + end_time = int(list(mbean.__getattr__('LastSnapshotEndTime'))[0]) + err_msg = list(mbean.__getattr__('LastSnapshotErrorMessage'))[0] + + self.logger.debug(f'Hostname={node.account.hostname}, ' + f'LastSnapshotStartTime={start_time}, ' + f'LastSnapshotEndTime={end_time}, ' + f'LastSnapshotErrorMessage={err_msg}' + ) + + if (0 < start_time < end_time) & (err_msg == ''): + self.print_snapshot_size(snapshot_name) + return + + time.sleep(1) + + raise TimeoutError(f'LastSnapshotStartTime={start_time}, ' + f'LastSnapshotEndTime={end_time}, ' + f'LastSnapshotErrorMessage={err_msg}') + + def print_snapshot_size(self, snapshot_name: str): Review comment: should be inlined? what's the reason for such printing? it will not check anything at automated check& ########## File path: modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/UuidStreamerApplication.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Loading random uuids in cache. + */ +public class UuidStreamerApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jNode) { + String cacheName = jNode.get("cacheName").asText(); + + int dataSize = Optional.ofNullable(jNode.get("dataSize")) + .map(JsonNode::asInt) + .orElse(1024); + + long iterSize = Optional.ofNullable(jNode.get("iterSize")) + .map(JsonNode::asLong) + .orElse(-1L); + + CacheConfiguration<UUID, byte[]> cacheCfg = new CacheConfiguration<>(cacheName); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setBackups(2); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setIndexedTypes(UUID.class, byte[].class); + + ignite.getOrCreateCache(cacheCfg); + + long start = System.currentTimeMillis(); + + markInitialized(); + + workParallel(ignite, cacheName, iterSize, dataSize); + + recordResult("DURATION", System.currentTimeMillis() - start); + + markFinished(); + } + + /** */ + private void workParallel(Ignite ignite, String cacheName, long iterSize, int dataSize) { + int core = Runtime.getRuntime().availableProcessors() / 2; + + long iterCore = 0 < iterSize ? iterSize / core : iterSize; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("UuidDataStreamer-%d") + .setDaemon(true) + .build(); + + ExecutorService executors = Executors.newFixedThreadPool(core, threadFactory); + CountDownLatch latch = new CountDownLatch(core); + + for (int i = 0; i < core; i++) + executors.submit(new UuidDataStreamer(ignite, cacheName, latch, iterCore, dataSize)); + + try { + while (true) { + if (latch.await(1, TimeUnit.SECONDS) || terminated()) + break; + } + } + catch (InterruptedException e) { + markBroken(new RuntimeException("Unexpected thread interruption", e)); + + Thread.currentThread().interrupt(); + } + finally { + executors.shutdownNow(); + } + } + + /** */ + private class UuidDataStreamer implements Runnable { + /** Ignite. */ + private final Ignite ignite; + + /** Cache name. */ + private final String cacheName; + + /** Latch. */ + private final CountDownLatch latch; + + /** Iteration size. */ + private final long iterSize; + + /** Data size. */ + private final int dataSize; + + /** */ + public UuidDataStreamer(Ignite ignite, String cacheName, CountDownLatch latch, long iterSize, int dataSize) { + this.ignite = ignite; + this.cacheName = cacheName; + this.latch = latch; + this.iterSize = iterSize; + this.dataSize = dataSize; + } + + /** {@inheritDoc} */ + @Override public void run() { + long cnt = 0L; + + try (IgniteDataStreamer<UUID, byte[]> dataStreamer = ignite.dataStreamer(cacheName)) { + dataStreamer.autoFlushFrequency(100L); + + while (cnt != iterSize && !Thread.currentThread().isInterrupted()) { Review comment: waiting for interruption sounds like a bad design. ########## File path: modules/ducktests/tests/ignitetest/services/utils/control_utility.py ########## @@ -127,6 +132,106 @@ def tx_kill(self, **kwargs): res = self.__parse_tx_list(output) return res if res else output + def validate_indexes(self, check_assert: bool = None): + """ + Validate indexes. + If check is true, will be return + """ + data = self.__run("--cache validate_indexes") + + if check_assert is not None: + assert (('no issues found.' in data) == check_assert), data + + return data + + def idle_verify(self, check_assert: bool = None): + """ + Idle verify. + """ + data = self.__run("--cache idle_verify") + + if check_assert is not None: + assert (('idle_verify check has finished, no conflicts have been found.' in data) == check_assert), data + + return data + + def idle_verify_dump(self, node=None, return_path: bool = False): + """ + Idle verify dump. + """ + data = self.__run("--cache idle_verify --dump", node=node) + + if return_path & ('VisorIdleVerifyDumpTask successfully' in data): + match = re.search(r'/.*.txt', data) + return match[0] + + return data + + def snapshot_create(self, snapshot_name: str, sync_mode: bool = True, time_out: int = 60): + """ + Create snapshot. + """ + res = self.__run(f"--snapshot create {snapshot_name}") + + self.logger.info(res) + + if ("Command [SNAPSHOT] finished with code: 0" in res) & sync_mode: + self.await_snapshot(snapshot_name=snapshot_name, time_out=time_out) + + return res + + def snapshot_cancel(self, snapshot_name: str): + """ + Cancel snapshot. + """ + return self.__run(f"--snapshot cancel {snapshot_name}") + + def snapshot_kill(self, snapshot_name: str): + """ + Kill snapshot. + """ + return self.__run(f"--kill SNAPSHOT {snapshot_name}") + + def await_snapshot(self, snapshot_name: str, time_out=60): + """ + Waiting for the snapshot to complete. + """ + delta_time = datetime.now() + timedelta(seconds=time_out) + + while datetime.now() < delta_time: + for node in self._cluster.nodes: + mbean = JmxClient(node).find_mbean('snapshot') + start_time = int(list(mbean.__getattr__('LastSnapshotStartTime'))[0]) + end_time = int(list(mbean.__getattr__('LastSnapshotEndTime'))[0]) + err_msg = list(mbean.__getattr__('LastSnapshotErrorMessage'))[0] + + self.logger.debug(f'Hostname={node.account.hostname}, ' Review comment: a lot of debug messages... ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
