ibessonov commented on a change in pull request #9527:
URL: https://github.com/apache/ignite/pull/9527#discussion_r755086914
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -283,10 +290,15 @@ public boolean diagnosticEnabled() {
try {
ClusterIdAndTag idAndTag = new
ClusterIdAndTag(cluster.id(), cluster.tag());
- if (log.isInfoEnabled())
- log.info("Writing cluster ID and tag to metastorage on
ready for write " + idAndTag);
+ if (idAndTag.id() != null) {
+ metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
- metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+ if (log.isInfoEnabled())
+ log.info("Writing cluster ID and tag to
metastorage on ready for write " + idAndTag);
+ }
+ else
Review comment:
Please add braces
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -201,6 +213,9 @@ public void cancel(boolean halt) throws
InterruptedException {
try {
RunnableFuture<?> curTask = updateQueue.take();
+ if (writeCondition != null && writeCondition.test(""))
Review comment:
You forgot to remove this part I think. Or, if it is correct, then
please document it.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -329,10 +339,51 @@ public void updateTag(String newTag) throws
IgniteCheckedException {
* </ul>
*/
public void onLocalJoin() {
- cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+ Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+ List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+ rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+ @Nullable Collection<BaselineNode> bltNodes =
cluster.currentBaselineTopology();
+
+ if (F.isEmpty(bltNodes)) {
+ log.info("Baseline node collection is empty.");
+
+ return;
+ }
+
+ @Nullable UUID first = null;
+
+ Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+ for (ClusterNode node : rmtNodes0) {
+ if (F.contains(srvIds, node.consistentId())) {
+ first = node.id();
+
+ break;
+ }
+ }
+
+ ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+ if (first == locNode.id() || locClusterId != null) {
+ cluster.setId(locClusterId != null ? locClusterId :
UUID.randomUUID());
+
+ cluster.setTag(locClusterTag != null ? locClusterTag :
+ ClusterTagGenerator.generateTag());
- cluster.setTag(locClusterTag != null ? locClusterTag :
- ClusterTagGenerator.generateTag());
+ ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(),
cluster.tag());
+
+ if (log.isInfoEnabled())
+ log.info("Writing cluster ID and tag to metastorage on ready
for write " + idAndTag);
+
+ try {
+ metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+ }
+ catch (IgniteCheckedException e) {
+ ctx.failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
Review comment:
> What if this node actually fails?
I'm still waiting for the reply btw.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -329,10 +339,51 @@ public void updateTag(String newTag) throws
IgniteCheckedException {
* </ul>
*/
public void onLocalJoin() {
- cluster.setId(locClusterId != null ? locClusterId : UUID.randomUUID());
+ Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+ List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+ rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+ @Nullable Collection<BaselineNode> bltNodes =
cluster.currentBaselineTopology();
+
+ if (F.isEmpty(bltNodes)) {
+ log.info("Baseline node collection is empty.");
+
+ return;
+ }
+
+ @Nullable UUID first = null;
+
+ Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+ for (ClusterNode node : rmtNodes0) {
+ if (F.contains(srvIds, node.consistentId())) {
+ first = node.id();
+
+ break;
+ }
+ }
+
+ ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+ if (first == locNode.id() || locClusterId != null) {
+ cluster.setId(locClusterId != null ? locClusterId :
UUID.randomUUID());
+
+ cluster.setTag(locClusterTag != null ? locClusterTag :
+ ClusterTagGenerator.generateTag());
- cluster.setTag(locClusterTag != null ? locClusterTag :
- ClusterTagGenerator.generateTag());
+ ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(),
cluster.tag());
+
+ if (log.isInfoEnabled())
+ log.info("Writing cluster ID and tag to metastorage on ready
for write " + idAndTag);
+
+ try {
+ metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+ }
+ catch (IgniteCheckedException e) {
+ ctx.failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
Review comment:
I don't get it, CAS is not that different from regular write. What do
you mean?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -328,11 +340,67 @@ public void updateTag(String newTag) throws
IgniteCheckedException {
* when it becomes ready for read.</li>
* </ul>
*/
+ public void onChangeState() {
+ Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+ List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+ rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+ @Nullable Collection<BaselineNode> bltNodes =
cluster.currentBaselineTopology();
+
+ if (F.isEmpty(bltNodes)) {
+ log.info("Baseline node collection is empty.");
+
+ return;
+ }
+
+ @Nullable UUID first = null;
+
+ Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+ for (ClusterNode node : rmtNodes0) {
+ if (F.contains(srvIds, node.consistentId())) {
+ first = node.id();
+
+ break;
+ }
+ }
+
+ ClusterNode locNode = ctx.config().getDiscoverySpi().getLocalNode();
+
+ if (first == locNode.id() || locClusterTag != null) {
+ final UUID id = locClusterId != null ? locClusterId :
UUID.randomUUID();
+
+ final String tag = locClusterTag != null ? locClusterTag :
ClusterTagGenerator.generateTag();
+
+ ClusterIdAndTag idAndTag = new ClusterIdAndTag(id, tag);
+
+ if (log.isInfoEnabled() && locClusterTag == null)
+ log.info("Writing cluster ID and tag to metastorage on ready
for write " + idAndTag);
+
+ try {
+ GridFutureAdapter<?> f =
metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
+
+ f.listen(o -> {
+ cluster.setId(id);
+ cluster.setTag(tag);
+ });
+ }
+ catch (IgniteCheckedException e) {
+ ctx.failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+ }
+ }
+
+ /** For pure in mem cluster we still need to generate tag and id on first
registred cluster node. */
Review comment:
```suggestion
/** For pure in mem cluster we still need to generate tag and id on
first registered cluster node. */
```
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -283,10 +290,15 @@ public boolean diagnosticEnabled() {
try {
ClusterIdAndTag idAndTag = new
ClusterIdAndTag(cluster.id(), cluster.tag());
- if (log.isInfoEnabled())
- log.info("Writing cluster ID and tag to metastorage on
ready for write " + idAndTag);
+ if (idAndTag.id() != null) {
+ metastorage.writeAsync(CLUSTER_ID_TAG_KEY, idAndTag);
Review comment:
I think that this code deserves a comment. Why are we sure that tag&id
don't exist in cluster and we can "write" instead of "cas"?
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
##########
@@ -73,19 +96,174 @@
public void testRestart() throws Exception {
IgniteEx ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().state(ClusterState.ACTIVE);
ignite.context().distributedMetastorage().write("key", "value");
stopGrid(0);
ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().state(ClusterState.ACTIVE);
assertEquals("value",
ignite.context().distributedMetastorage().read("key"));
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStoreLagOnOneNode() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ IgniteEx ignite2 = startGrid(1);
+
+ DistributedMetaStorageImpl distrMetaStore =
(DistributedMetaStorageImpl)ignite.context().distributedMetastorage();
+
+ DmsDataWriterWorker worker =
GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+ assertNotNull(worker);
+
+ // check we still have no cluster tag key.
+ assertNull(distrMetaStore.read(CLUSTER_ID_TAG_KEY));
+
+ GridTestUtils.setFieldValue(worker, "writeCondition", new
Predicate<String>() {
+ private volatile boolean skip;
+
+ @Override public boolean test(String s) {
+ if (s.equals(CLUSTER_ID_TAG_KEY) || skip) {
+ skip = true;
+
+ return true;
+ }
+ return false;
+ }
+ });
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ String clusterTag = "griffon";
+
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ boolean fail = false;
Review comment:
This can be replaces with `return true;` in `try` and `return false;` in
`catch`.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
##########
@@ -73,19 +96,174 @@
public void testRestart() throws Exception {
IgniteEx ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().state(ClusterState.ACTIVE);
ignite.context().distributedMetastorage().write("key", "value");
stopGrid(0);
ignite = startGrid(0);
- ignite.cluster().active(true);
+ ignite.cluster().state(ClusterState.ACTIVE);
assertEquals("value",
ignite.context().distributedMetastorage().read("key"));
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStoreLagOnOneNode() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ IgniteEx ignite2 = startGrid(1);
+
+ DistributedMetaStorageImpl distrMetaStore =
(DistributedMetaStorageImpl)ignite.context().distributedMetastorage();
+
+ DmsDataWriterWorker worker =
GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+ assertNotNull(worker);
+
+ // check we still have no cluster tag key.
+ assertNull(distrMetaStore.read(CLUSTER_ID_TAG_KEY));
+
+ GridTestUtils.setFieldValue(worker, "writeCondition", new
Predicate<String>() {
+ private volatile boolean skip;
+
+ @Override public boolean test(String s) {
+ if (s.equals(CLUSTER_ID_TAG_KEY) || skip) {
+ skip = true;
+
+ return true;
+ }
+ return false;
+ }
+ });
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ String clusterTag = "griffon";
+
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ boolean fail = false;
+
+ try {
+ ignite2.cluster().tag(clusterTag);
+ }
+ catch (IgniteCheckedException e) {
+ assertTrue(e.getMessage().contains("Cannot change tag as
default"));
+
+ fail = true;
+ }
+
+ return !fail;
+ }, 10_000));
+
+ String tag0 = ignite2.cluster().tag();
+
+ String key = "some_kind_of_uniq_key_" +
ThreadLocalRandom.current().nextInt();
+
+ checkStoredWithPers(metastorage(0), ignite2, key, "value");
+
+ stopAllGrids();
+
+ ignite = startGrid(0);
+
+ startGrid(1);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ assertEquals("value",
ignite.context().distributedMetastorage().read(key));
+
+ assertEquals(tag0, ignite.cluster().tag());
+ }
+
+ /** Check cluster tag behaviour while one node fails. */
+ @Test
+ public void changeTagWithNodeCrash() throws Exception {
+ String clusterTag = "seamonkey";
+
+ IgniteEx ignite = startGrid(0);
+
+ IgniteEx ignite2 = startGrid(1);
+
+ Collection<ClusterNode> rmtNodes =
ignite.cluster().forServers().nodes();
+
+ List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+ rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+ // Choose a node the same as in ClusterProcessor.onChangeState.
+ @Nullable UUID first = F.first(rmtNodes0).id();
+
+ assertNotNull(first);
+
+ IgniteEx activeNode = ignite.cluster().localNode().id() == first ?
ignite : ignite2;
+
+ System.err.println("node to skip: " + activeNode.name());
+
+ assertEquals(activeNode.cluster().localNode().id(), first);
+
+ DistributedMetaStorageImpl distrMetaStore =
+
(DistributedMetaStorageImpl)activeNode.context().distributedMetastorage();
+
+ ClusterProcessor proc = activeNode.context().cluster();
+
+ AtomicBoolean fail = new AtomicBoolean();
+
+ DistributedMetaStorageDelegate delegate = new
DistributedMetaStorageDelegate(distrMetaStore, fail);
+
+ DmsDataWriterWorker worker =
GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+ GridTestUtils.setFieldValue(proc, "metastorage", delegate);
+
+ GridTestUtils.setFieldValue(worker, "writeCondition", new
Predicate<String>() {
+ @Override public boolean test(String s) {
+ if (s.equals(CLUSTER_ID_TAG_KEY) || fail.get()) {
+ fail.set(true);
+
+ return true;
+ }
+ return false;
+ }
+ });
+
+ IgniteEx alive = activeNode.name().equals(ignite.name()) ? ignite2 :
ignite;
+
+ alive.cluster().state(ClusterState.ACTIVE);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ boolean failed = false;
Review comment:
Same here, `return !failed;` just looks complicated
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/cluster/IgniteClusterIdTagTest.java
##########
@@ -424,4 +444,63 @@ public void testTagChangedEventMultinodeWithRemoteFilter()
throws Exception {
assertEquals(generatedTag, oldTagFromEvent.get());
assertEquals(CUSTOM_TAG_0, newTagFromEvent.get());
}
+
+ /**
+ * @return {@link DistributedMetaStorage} instance for i'th node.
+ */
+ protected DistributedMetaStorage metastorage(int i) {
+ return grid(i).context().distributedMetastorage();
+ }
+
+ /** Checks that appropriate key, value are stored into local metastore. */
+ protected void checkStoredWithPers(
+ DistributedMetaStorage msToStore,
+ IgniteEx instanceToCheck,
+ String key,
+ String value
+ ) throws IgniteCheckedException {
+ assertTrue(isPersistenceEnabled);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final DistributedMetaStorageImpl distrMetaStore =
+
(DistributedMetaStorageImpl)instanceToCheck.context().distributedMetastorage();
+
+ DmsDataWriterWorker worker =
GridTestUtils.getFieldValue(distrMetaStore, "worker");
+
+ ReadWriteMetastorage metastorage = GridTestUtils.getFieldValue(worker,
"metastorage");
+
+ assertNotNull(metastorage);
+
+ IgniteInternalFuture f = GridTestUtils.runAsync(() -> {
+ try {
+ latch.await();
+
+ assertTrue(waitForCondition(() -> {
+ try {
+ AtomicReference<String> contains = new
AtomicReference<>();
+
+ metastorage.iterate("", (k, v) -> {
+ if (k.contains(key))
+ contains.set(k);
+ }, false);
+
+ return contains.get() != null &&
metastorage.readRaw(contains.get()) != null;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }, 15_000));
+ }
+ catch (IgniteInterruptedCheckedException | InterruptedException e)
{
+ throw new IgniteException(e);
+ }
+ });
+
+ latch.countDown();
+
+ msToStore.write(key, value);
+
+ f.get();
Review comment:
Please add timeout
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
##########
@@ -328,11 +340,67 @@ public void updateTag(String newTag) throws
IgniteCheckedException {
* when it becomes ready for read.</li>
* </ul>
*/
+ public void onChangeState() {
+ Collection<ClusterNode> rmtNodes = cluster.forServers().nodes();
+ List<ClusterNode> rmtNodes0 = new ArrayList<>(rmtNodes);
+
+ rmtNodes0.sort(Comparator.comparing(ClusterNode::id));
+
+ @Nullable Collection<BaselineNode> bltNodes =
cluster.currentBaselineTopology();
+
+ if (F.isEmpty(bltNodes)) {
+ log.info("Baseline node collection is empty.");
+
+ return;
+ }
+
+ @Nullable UUID first = null;
+
+ Collection<Object> srvIds = F.nodeConsistentIds(bltNodes);
+
+ for (ClusterNode node : rmtNodes0) {
+ if (F.contains(srvIds, node.consistentId())) {
+ first = node.id();
Review comment:
A perfect opportunity to use Stream API, don't you think so? Take a look
at `java.util.stream.Stream#min`, for example
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]