[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2fc690e6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2fc690e6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2fc690e6 Branch: refs/heads/ignite-zk-join Commit: 2fc690e64c5a75607eb4a8542cf800a8290cda3c Parents: ec75bba Author: sboikovAuthored: Thu Jan 11 17:52:53 2018 +0300 Committer: sboikov Committed: Thu Jan 11 17:52:53 2018 +0300 -- .../zk/internal/ZkBulkJoinContext.java | 50 ++ .../internal/ZkDiscoveryNodeJoinEventData.java | 44 +- .../discovery/zk/internal/ZkIgnitePaths.java| 6 +- .../zk/internal/ZkJoinEventDataForJoined.java | 42 +- .../zk/internal/ZkJoinedNodeEvtData.java| 79 .../zk/internal/ZookeeperDiscoveryImpl.java | 469 +++ .../zk/internal/ZookeeperDiscoverySpiTest.java | 38 +- 7 files changed, 491 insertions(+), 237 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java new file mode 100644 index 000..a186aed --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.util.typedef.T2; + +/** + * + */ +class ZkBulkJoinContext { +/** */ +List >> nodes; + +/** + * @param nodeEvtData Node event data. + * @param discoData Discovery data for node. + */ +void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { +if (nodes == null) +nodes = new ArrayList<>(); + +nodes.add(new T2<>(nodeEvtData, discoData)); +} + +/** + * @return Number of joined nodes. + */ +int nodes() { +return nodes != null ? nodes.size() : 0; +} +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2fc690e6/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index ff75d22..e46d52d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -17,7 +17,7 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.util.UUID; +import java.util.List; /** * @@ -27,53 +27,27 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { private static final long serialVersionUID = 0L; /** */ -final long joinedInternalId; - -/** */ -final UUID nodeId; - -/** */ -final int joinDataPartCnt; +final List joinedNodes; /** */ final int dataForJoinedPartCnt; -/** */ -final int secSubjPartCnt; - -/** */ -final UUID joinDataPrefixId; - -/** */ -transient ZkJoiningNodeData joiningNodeData; - /** * @param evtId Event ID. * @param topVer Topology version. - * @param nodeId Joined node ID. - * @param joinedInternalId Joined node internal ID. - * @param joinDataPrefixId Join data unique
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b78f318 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b78f318 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b78f318 Branch: refs/heads/ignite-zk Commit: 0b78f318b12bb03f4ddce32e7f9c5d4eba8305ca Parents: 6961ddc Author: sboikovAuthored: Thu Dec 28 12:03:34 2017 +0300 Committer: sboikov Committed: Thu Dec 28 12:03:34 2017 +0300 -- .../DiscoveryMessageResultsCollector.java | 222 +++ .../continuous/GridContinuousProcessor.java | 191 +++- 2 files changed, 302 insertions(+), 111 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/0b78f318/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java new file mode 100644 index 000..72a4636 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class DiscoveryMessageResultsCollector { +/** */ +private final Map rcvd = new HashMap<>(); + +/** */ +private int leftMsgs; + +/** */ +protected DiscoCache discoCache; + +/** */ +protected final GridKernalContext ctx; + +/** + * @param ctx Context. + */ +protected DiscoveryMessageResultsCollector(GridKernalContext ctx) { +this.ctx = ctx; +} + +/** + * @param rcvd Received messages. + * @return Result. + */ +protected abstract R createResult(Map rcvd); + +/** + * @param r Result. + */ +protected abstract void onResultsCollected(R r); + +/** + * @param discoCache Discovery state when discovery message was received. + * @param node Node. + * @return {@code True} if need wait for result from given node. + */ +protected abstract boolean waitForNode(DiscoCache discoCache, ClusterNode node); + +/** + * @param discoCache Discovery state. + */ +public final void init(DiscoCache discoCache) { +assert discoCache != null; + +R res = null; + +synchronized (this) { +assert this.discoCache == null; +assert leftMsgs == 0 : leftMsgs; + +this.discoCache = discoCache; + +for (ClusterNode node : discoCache.allNodes()) { +if (ctx.discovery().alive(node) && waitForNode(discoCache, node) && !rcvd.containsKey(node.id())) { +rcvd.put(node.id(), new NodeMessage<>((M)null)); + +leftMsgs++; +} +} + +if (leftMsgs == 0) +res = createResult(rcvd); +} + +if (res != null) +onResultsCollected(res); +} + +/** + * @param nodeId Node ID. + * @param msg Message. + */ +public final void onMessage(UUID nodeId, M msg) { +R res = null; + +synchronized (this) { +if (allReceived()) +return; + +NodeMessage expMsg = rcvd.get(nodeId); + +if (expMsg == null) +rcvd.put(nodeId, new NodeMessage<>(msg)); +else if (expMsg.set(msg)) { +assert
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52174ef7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52174ef7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52174ef7 Branch: refs/heads/ignite-zk Commit: 52174ef77761247dfefea7d1b90833ca816fb761 Parents: b37c35f Author: sboikovAuthored: Fri Dec 8 15:11:56 2017 +0300 Committer: sboikov Committed: Fri Dec 8 15:11:56 2017 +0300 -- .../processors/continuous/GridContinuousProcessor.java | 6 +++--- .../spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java| 8 +--- 2 files changed, 4 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/52174ef7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 0583a8a..2e5bf47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -2398,7 +2398,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { expRes++; } -if (expRes == res.size()) +if (res.size() >= expRes) res0 = createRegisterResults(); } @@ -2414,12 +2414,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { RoutineRegisterResults res0 = null; synchronized (res) { -if (res.containsKey(nodeId) || (topVer != null && res.size() == expRes)) +if (res.containsKey(nodeId) || (topVer != null && res.size() >= expRes)) return; res.put(nodeId, msg); -if (topVer != null && expRes == res.size()) +if (topVer != null && res.size() >= expRes) res0 = createRegisterResults(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/52174ef7/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 105f71b..814406a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -2468,13 +2468,9 @@ public class ZookeeperDiscoveryImpl { return; try { -if (evt.getType() == Event.EventType.NodeDeleted) { +if (evt.getType() == Event.EventType.NodeDeleted) onPreviousNodeFail(); -} else { -if (log.isInfoEnabled()) -log.info("Previous node watch event: " + evt); - if (evt.getType() != Event.EventType.None) rtState.zkClient.existsAsync(evt.getPath(), this, this); } @@ -2492,8 +2488,6 @@ public class ZookeeperDiscoveryImpl { return; try { -log.info("Previous node stat callback [rc=" + rc + ", path=" + path + ", stat=" + stat + ']'); - assert rc == 0 || rc == KeeperException.Code.NONODE.intValue() : KeeperException.Code.get(rc); if (rc == KeeperException.Code.NONODE.intValue() || stat == null)
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e775be3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e775be3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e775be3 Branch: refs/heads/ignite-zk-alpha Commit: 1e775be39e2494e0e3427298d9bb9ddb93bd2179 Parents: c556b16 Author: sboikovAuthored: Fri Dec 8 11:30:39 2017 +0300 Committer: sboikov Committed: Fri Dec 8 11:30:39 2017 +0300 -- .../ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/1e775be3/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 91d5fdd..caa60ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -718,7 +718,7 @@ public class ZookeeperDiscoveryImpl { join event after some timeout. */ rtState.joinTimeoutObj = new CheckJoinStateTimeoutObject( -multipartPathName(joinDataPath, 0), +joinDataPath, rtState); spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj);
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6fb5603 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6fb5603 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6fb5603 Branch: refs/heads/ignite-zk-alpha Commit: f6fb5603bb9565b9e1067354c069624d5a379007 Parents: 2ffdd07 Author: sboikovAuthored: Fri Dec 8 10:09:01 2017 +0300 Committer: sboikov Committed: Fri Dec 8 10:09:01 2017 +0300 -- .../ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java| 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/f6fb5603/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 2e97aed..91d5fdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -706,6 +706,7 @@ public class ZookeeperDiscoveryImpl { ", instanceName=" + locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) + ", joinDataSize=" + joinDataBytes.length + ", joinDataPartCnt=" + rtState.joinDataPartCnt + +", consistentId=" + locNode.consistentId() + ", initTime=" + (System.currentTimeMillis() - startTime) + ", nodePath=" + rtState.locNodeZkPath + ']');
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a11e06a5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a11e06a5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a11e06a5 Branch: refs/heads/ignite-zk Commit: a11e06a5bde8b5e25b37374504030423c0062183 Parents: 3d1c0e8 Author: sboikovAuthored: Thu Dec 7 12:49:31 2017 +0300 Committer: sboikov Committed: Thu Dec 7 12:49:31 2017 +0300 -- .../discovery/zk/internal/ZkIgnitePaths.java| 61 --- .../discovery/zk/internal/ZookeeperClient.java | 6 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 102 +++ .../ZookeeperDiscoverySpiBasicTest.java | 77 +- 4 files changed, 183 insertions(+), 63 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/a11e06a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index c9c0281..0d47658 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -30,13 +30,16 @@ class ZkIgnitePaths { private static final String JOIN_DATA_DIR = "jd"; /** */ -private static final String CUSTOM_EVTS_DIR = "c"; +private static final String CUSTOM_EVTS_DIR = "ce"; + +/** */ +private static final String CUSTOM_EVTS_PARTS_DIR = "cp"; /** */ private static final String CUSTOM_EVTS_ACKS_DIR = "ca"; /** */ -private static final String ALIVE_NODES_DIR = "n"; +static final String ALIVE_NODES_DIR = "n"; /** */ private static final String DISCO_EVENTS_PATH = "e"; @@ -57,6 +60,9 @@ class ZkIgnitePaths { final String customEvtsDir; /** */ +final String customEvtsPartsDir; + +/** */ final String customEvtsAcksDir; /** @@ -69,6 +75,7 @@ class ZkIgnitePaths { joinDataDir = zkPath(JOIN_DATA_DIR); evtsPath = zkPath(DISCO_EVENTS_PATH); customEvtsDir = zkPath(CUSTOM_EVTS_DIR); +customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR); customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR); } @@ -201,7 +208,7 @@ class ZkIgnitePaths { * @return Event node ID. */ static UUID customEventSendNodeId(String path) { -// :| +// ::| int startIdx = ZkIgnitePaths.UUID_LEN + 1; String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); @@ -209,6 +216,41 @@ class ZkIgnitePaths { return UUID.fromString(idStr); } +static String customEventPrefix(String path) { +// ::| + +return path.substring(0, ZkIgnitePaths.UUID_LEN); +} + +/** + * @param path Custom event zl path. + * @return Event node ID. + */ +static int customEventPartsCount(String path) { +// ::| +int startIdx = 2 * ZkIgnitePaths.UUID_LEN + 2; + +String cntStr = path.substring(startIdx, startIdx + 4); + +int partCnt = Integer.parseInt(cntStr); + +assert partCnt >= 1 : partCnt; + +return partCnt; +} + +String createCustomEventPath(String prefix, UUID nodeId, int partCnt) { +return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + String.format("%04d", partCnt) + '|'; +} + +String customEventPartsBasePath(String prefix, UUID nodeId) { +return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":"; +} + +String customEventPartPath(String prefix, UUID nodeId, int part) { +return customEventPartsBasePath(prefix, nodeId) + String.format("%04d", part); +} + /** * @param evtId Event ID. * @return Event zk path. @@ -222,17 +264,6 @@ class ZkIgnitePaths { * @return Path for custom event ack. */ String ackEventDataPath(long evtId) { -return customEventDataPath(true, String.valueOf(evtId)); -} - -/** - * @param ack Ack event flag. - * @param child Event child path. - * @return Full event data path. - */ -String customEventDataPath(boolean ack, String child) { -String baseDir = ack ? customEvtsAcksDir : customEvtsDir; - -return baseDir + "/" + child; +return customEvtsAcksDir + "/" + String.valueOf(evtId); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a11e06a5/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbd5a889 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbd5a889 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbd5a889 Branch: refs/heads/ignite-zk Commit: bbd5a889fc980e36c8c500ef221a551e451ad854 Parents: 961167a Author: sboikovAuthored: Tue Dec 5 14:57:04 2017 +0300 Committer: sboikov Committed: Tue Dec 5 17:57:35 2017 +0300 -- .../internal/ZkDiscoveryNodeJoinEventData.java | 16 +- .../discovery/zk/internal/ZkIgnitePaths.java| 37 +-- .../zk/internal/ZkInternalJoinErrorMessage.java | 3 + .../zk/internal/ZkJoiningNodeData.java | 15 +- .../discovery/zk/internal/ZkRuntimeState.java | 3 + .../discovery/zk/internal/ZookeeperClient.java | 45 +++ .../zk/internal/ZookeeperDiscoveryImpl.java | 289 ++- .../zk/internal/ZookeeperClientTest.java| 58 ++-- 8 files changed, 328 insertions(+), 138 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java index df4c137..fbf1fc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java @@ -34,6 +34,12 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { final UUID nodeId; /** */ +final int joinDataPartCnt; + +/** */ +final UUID joinDataPrefixId; + +/** */ transient ZkJoiningNodeData joiningNodeData; /** @@ -42,11 +48,19 @@ class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData { * @param nodeId Joined node ID. * @param joinedInternalId Joined node internal ID. */ -ZkDiscoveryNodeJoinEventData(long evtId, long topVer, UUID nodeId, int joinedInternalId) { +ZkDiscoveryNodeJoinEventData(long evtId, +long topVer, +UUID nodeId, +int joinedInternalId, +UUID joinDataPrefixId, +int joinDataPartCnt) +{ super(evtId, EventType.EVT_NODE_JOINED, topVer); this.nodeId = nodeId; this.joinedInternalId = joinedInternalId; +this.joinDataPrefixId = joinDataPrefixId; +this.joinDataPartCnt = joinDataPartCnt; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd5a889/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 2478979..e52127a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -151,14 +151,8 @@ class ZkIgnitePaths { return clusterDir + "/" + path; } -String joiningNodeDataPath(UUID nodeId, String aliveNodePath) { -int joinSeq = ZkIgnitePaths.aliveJoinDataSequence(aliveNodePath); - -return joinDataDir + '/' + -ZkIgnitePaths.aliveNodePrefixId(aliveNodePath) + ":" + -nodeId.toString() + -"|" + -String.format("%010d", joinSeq); +String joiningNodeDataPath(UUID nodeId, UUID prefixId) { +return joinDataDir + '/' + prefixId + ":" + nodeId.toString(); } /** @@ -175,8 +169,8 @@ class ZkIgnitePaths { * @param path Alive node zk path. * @return Node ID. */ -static String aliveNodePrefixId(String path) { -return path.substring(0, ZkIgnitePaths.UUID_LEN); +static UUID aliveNodePrefixId(String path) { +return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN)); } /** @@ -184,7 +178,7 @@ class ZkIgnitePaths { * @return Node ID. */ static UUID aliveNodeId(String path) { -// :|| +// :| int startIdx = ZkIgnitePaths.UUID_LEN + 1; String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); @@ -193,17 +187,6 @@ class ZkIgnitePaths { } /** - * @param path Alive node zk path. - * @return Joined node sequence. -
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eba59c38 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eba59c38 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eba59c38 Branch: refs/heads/ignite-zk Commit: eba59c38a774b41fcf6ff66e2f8eedc920f7ecc5 Parents: 4a8a7d0 Author: sboikovAuthored: Wed Nov 29 15:43:09 2017 +0300 Committer: sboikov Committed: Wed Nov 29 15:43:09 2017 +0300 -- .../apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java | 5 - .../discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java | 6 -- .../java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/eba59c38/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index b036462..3c1f805 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -54,6 +54,9 @@ import org.jetbrains.annotations.Nullable; @DiscoverySpiHistorySupport(true) public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, IgniteDiscoverySpi { /** */ +public static final String DFLT_ROOT_PATH = "/apacheIgnite"; + +/** */ @GridToStringInclude private String zkConnectionString; @@ -63,7 +66,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** */ @GridToStringInclude -private String zkRootPath = "/apacheIgnite"; +private String zkRootPath = DFLT_ROOT_PATH; /** */ @GridToStringExclude http://git-wip-us.apache.org/repos/asf/ignite/blob/eba59c38/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index f98d66a..8ae84c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -81,7 +81,7 @@ import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; */ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ -private static final String IGNITE_ZK_ROOT = "/apacheIgnite"; +private static final String IGNITE_ZK_ROOT = ZookeeperDiscoverySpi.DFLT_ROOT_PATH; /** */ private static final int ZK_SRVS = 3; @@ -672,7 +672,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { try { -List c = zkClient.getChildren(IGNITE_ZK_ROOT + "/alive"); +List c = zkClient.getChildren(IGNITE_ZK_ROOT + "/n"); for (String failedZkNode : failedZkNodes) { if (c.contains(failedZkNode)) { @@ -685,6 +685,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { return true; } catch (Exception e) { +e.printStackTrace(); + fail(); return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/eba59c38/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java -- diff --git a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java b/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java index fac405a..b8e1c80 100644 --- a/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java +++ b/modules/core/src/test/java/org/apache/zookeeper/ZkTestClientCnxnSocketNIO.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.util.concurrent.ConcurrentHashMap; import
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d175824b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d175824b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d175824b Branch: refs/heads/ignite-zk Commit: d175824b93899f9de2b21357b85f7814d94dce22 Parents: 36be0de Author: sboikovAuthored: Tue Nov 28 09:29:22 2017 +0300 Committer: sboikov Committed: Tue Nov 28 09:29:22 2017 +0300 -- .../discovery/zk/internal/ZookeeperDiscoveryImpl.java| 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/d175824b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 76eb306..ba475ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -657,12 +657,17 @@ public class ZookeeperDiscoveryImpl { private void saveAndProcessNewEvents() throws Exception { long start = System.currentTimeMillis(); +byte [] evtsBytes = marsh.marshal(state.evtsData); + state.zkClient.setData(zkPaths.evtsPath, marsh.marshal(state.evtsData), -1); long time = System.currentTimeMillis() - start; -if (log.isInfoEnabled()) -log.info("Discovery coordinator saved new topology events [topVer=" + state.evtsData.topVer + ", saveTime=" + time + ']'); +if (log.isInfoEnabled()) { +log.info("Discovery coordinator saved new topology events [topVer=" + state.evtsData.topVer + +", size=" + evtsBytes.length + +", saveTime=" + time + ']'); +} processNewEvents(state.evtsData); } @@ -988,7 +993,7 @@ public class ZookeeperDiscoveryImpl { } else { if (log.isInfoEnabled()) -log.info("New discovery event data: " + evtData + ']'); +log.info("New discovery event data [evt=" + evtData + ", evtsHist=" + evts.size() + ']'); switch (evtData.eventType()) { case EventType.EVT_NODE_JOINED: {
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc297aa4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc297aa4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc297aa4 Branch: refs/heads/ignite-zk Commit: bc297aa4b47cae1e5cb87e5d7863900c82fa90ce Parents: 9761a02 Author: sboikovAuthored: Wed Nov 22 13:48:27 2017 +0300 Committer: sboikov Committed: Wed Nov 22 13:48:27 2017 +0300 -- .../apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/bc297aa4/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java -- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 591f18d..f0fcaca 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -130,6 +130,8 @@ class ZkIgnitePaths { } String customEventDataPath(boolean ack, String child) { -return ack ? customEvtsAcksDir : customEvtsDir + "/" + child; +String baseDir = ack ? customEvtsAcksDir : customEvtsDir; + +return baseDir + "/" + child; } }
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11e2567f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11e2567f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11e2567f Branch: refs/heads/ignite-zk Commit: 11e2567fffa724e6b4af6021cda1bfbcf775370b Parents: c55d5c2 Author: sboikovAuthored: Thu Nov 16 16:10:04 2017 +0300 Committer: sboikov Committed: Thu Nov 16 17:38:47 2017 +0300 -- .../spi/discovery/zk/ZookeeperClusterNode.java | 220 -- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 2 +- .../discovery/zk/ZookeeperDiscoverySpi2.java| 664 +++ .../discovery/zk/internal/ZkDiscoveryImpl.java | 61 ++ .../discovery/zk/internal/ZookeeperClient.java | 348 ++ .../ZookeeperClientFailedException.java | 30 + .../zk/internal/ZookeeperClusterNode.java | 220 ++ .../zk/internal/ZookeeperClientTest.java| 254 +++ 8 files changed, 1578 insertions(+), 221 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/11e2567f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java -- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java deleted file mode 100644 index ae638b8..000 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperClusterNode.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.ClusterMetricsSnapshot; -import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID; - -/** - * - */ -public class ZookeeperClusterNode implements ClusterNode, Serializable { -/** */ -private UUID id; - -/** */ -private Serializable consistentId; - -/** */ -private long internalOrder; - -/** */ -private long order; - -/** */ -private IgniteProductVersion ver; - -/** Node attributes. */ -@GridToStringExclude -private Map attrs; - -/** */ -private transient boolean loc; - -/** TODO ZK */ -private transient ClusterMetrics metrics; - -/** */ -private boolean client; - -/** - * @param id Node ID. - * @param ver Node version. - * @param attrs Node attributes. - * @param consistentId Consistent ID. - * @param client Client node flag. - */ -public ZookeeperClusterNode(UUID id, -IgniteProductVersion ver, -Map attrs, -Serializable consistentId, -boolean client) { -assert id != null; -assert consistentId != null; - -this.id = id; -this.ver = ver; -this.attrs = U.sealMap(attrs); -this.consistentId = consistentId; -this.client = client; -} - -/** {@inheritDoc} */ -@Override public UUID id() { -return id; -} - -/** {@inheritDoc} */ -@Override public Object consistentId() { -return consistentId; -} - -/** - * Sets consistent globally unique node ID which survives node restarts. - * - *
[2/2] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ed2564a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ed2564a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ed2564a Branch: refs/heads/ignite-zk Commit: 6ed2564a8d68e651cb776e13302d62f415938bea Parents: 9970b95 Author: sboikovAuthored: Mon Nov 13 14:24:54 2017 +0300 Committer: sboikov Committed: Mon Nov 13 14:24:54 2017 +0300 -- .../communication/tcp/TcpCommunicationSpi.java | 18 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 47 ++-- .../zk/ZookeeperDiscoverySpiBasicTest.java | 36 ++- 3 files changed, 77 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 49425ce..04683ac 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -478,19 +478,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (rmtNode == null) { DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi(); -assert discoverySpi instanceof TcpDiscoverySpi; - -TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; +boolean unknownNode = true; -ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); +if (discoverySpi instanceof TcpDiscoverySpi) { +TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; -boolean unknownNode = true; +ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); -if (node0 != null) { -assert node0.isClient() : node0; +if (node0 != null) { +assert node0.isClient() : node0; -if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) -unknownNode = false; +if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) +unknownNode = false; +} } if (unknownNode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/6ed2564a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java -- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index f36a6e2..85b0aa5 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; @@ -36,6 +37,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -415,9 +417,16 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery List res = zk.multi(joinOps); -log.info("Waiting for local join event."); +log.info("Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); + +for(;;) { +if (!joinLatch.await(10, TimeUnit.SECONDS)) { +U.warn(log, "Waiting for local join event [nodeId=" +