[4/5] ignite git commit: zk
http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java new file mode 100644 index 000..de54210 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -0,0 +1,1454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; +import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.jetbrains.annotations.Nullable; + +import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; +import static org.apache.zookeeper.CreateMode.PERSISTENT; + +/** + * TODO ZK: check if compression makes sense. + */ +public class ZookeeperDiscoveryImpl { +/** */ +static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = "IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD"; + +/** */ +private final JdkMarshaller marsh = new JdkMarshaller(); + +/** */ +private final ZkIgnitePaths zkPaths; + +/** */ +private final IgniteLogger log; + +/** */ +private final ZookeeperClusterNode locNode; + +/** */ +private final DiscoverySpiListener lsnr; + +/** */ +private final DiscoverySpiDataExchange exchange; + +/** */ +private ZookeeperClient zkClient; + +/** */ +private final GridFutureAdapter joinFut = new GridFutureAdapter<>(); + +/** */ +private final ZkWatcher watcher; + +/** */ +private final ZKChildrenCallback childrenCallback; + +/** */ +private final ZkDataCallback dataCallback; + +/** */ +private final ZkClusterNodes top = new ZkClusterNodes(); + +/** */ +private long gridStartTime; + +/** */ +private boolean joined; + +/** */ +private ZkDiscoveryEventsData evtsData; + +/** */ +private boolean crd; + +/** */ +private String locNodeZkPath; + +/** */ +private ZkAliveNodeData locNodeInfo = new ZkAliveNodeData(); + +/** */ +private final int evtsAckThreshold; + +/** + * @param log Logger. + * @param basePath Zookeeper base path node all nodes. + * @param clusterName Cluster name ( + * @param locNode Local node instance. + * @param lsnr Discovery events listener. + * @param exchange Discovery data exchange. + */ +public
[4/5] ignite git commit: zk
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa0ca90c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa0ca90c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa0ca90c Branch: refs/heads/ignite-zk Commit: aa0ca90cbaec809715190c1036654a6aad0fb0a3 Parents: ac07cbe Author: sboikovAuthored: Wed Nov 15 12:50:14 2017 +0300 Committer: sboikov Committed: Wed Nov 15 12:50:14 2017 +0300 -- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 14 .../zk/ZookeeperDiscoverySpiBasicTest.java | 34 2 files changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/aa0ca90c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java -- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 1659dcf..cee9e65 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -425,6 +426,19 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (log != null) log.info("Curator event, connection: " + newState); + +if (newState == ConnectionState.LOST) { +U.warn(log, "Connection was lost, local node SEGMENTED"); + +zkCurator.close(); + +lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED, +0, +locNode, +Collections.emptyList(), +Collections. emptyMap(), +null); +} } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/aa0ca90c/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java -- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 4cd86db..bc577d2 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -24,6 +24,8 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; @@ -262,6 +264,38 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ +public void testSegmentation1() throws Exception { +sesTimeout = 2000; +testSockNio = true; + +Ignite node0 = startGrid(0); + +final CountDownLatch l = new CountDownLatch(1); + +node0.events().localListen(new IgnitePredicate() { +@Override public boolean apply(Event event) { +l.countDown(); + +return false; +} +}, EventType.EVT_NODE_SEGMENTED); + +ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); + +c0.closeSocket(true); + +Thread.sleep(4_000); + +info("Allow connect"); + +c0.allowConnect(); + +assertTrue(l.await(10, TimeUnit.SECONDS)); +} + +/** + * @throws Exception If failed. + */ public void testConnectionRestore1() throws Exception { testSockNio = true;