[4/5] ignite git commit: zk

2017-11-22 Thread sboikov
http://git-wip-us.apache.org/repos/asf/ignite/blob/42bbed0a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
--
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
new file mode 100644
index 000..de54210
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -0,0 +1,1454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import static org.apache.zookeeper.CreateMode.PERSISTENT;
+
+/**
+ * TODO ZK: check if compression makes sense.
+ */
+public class ZookeeperDiscoveryImpl {
+/** */
+static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
+
+/** */
+private final JdkMarshaller marsh = new JdkMarshaller();
+
+/** */
+private final ZkIgnitePaths zkPaths;
+
+/** */
+private final IgniteLogger log;
+
+/** */
+private final ZookeeperClusterNode locNode;
+
+/** */
+private final DiscoverySpiListener lsnr;
+
+/** */
+private final DiscoverySpiDataExchange exchange;
+
+/** */
+private ZookeeperClient zkClient;
+
+/** */
+private final GridFutureAdapter joinFut = new GridFutureAdapter<>();
+
+/** */
+private final ZkWatcher watcher;
+
+/** */
+private final ZKChildrenCallback childrenCallback;
+
+/** */
+private final ZkDataCallback dataCallback;
+
+/** */
+private final ZkClusterNodes top = new ZkClusterNodes();
+
+/** */
+private long gridStartTime;
+
+/** */
+private boolean joined;
+
+/** */
+private ZkDiscoveryEventsData evtsData;
+
+/** */
+private boolean crd;
+
+/** */
+private String locNodeZkPath;
+
+/** */
+private ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+
+/** */
+private final int evtsAckThreshold;
+
+/**
+ * @param log Logger.
+ * @param basePath Zookeeper base path node all nodes.
+ * @param clusterName Cluster name (
+ * @param locNode Local node instance.
+ * @param lsnr Discovery events listener.
+ * @param exchange Discovery data exchange.
+ */
+public 

[4/5] ignite git commit: zk

2017-11-15 Thread sboikov
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa0ca90c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa0ca90c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa0ca90c

Branch: refs/heads/ignite-zk
Commit: aa0ca90cbaec809715190c1036654a6aad0fb0a3
Parents: ac07cbe
Author: sboikov 
Authored: Wed Nov 15 12:50:14 2017 +0300
Committer: sboikov 
Committed: Wed Nov 15 12:50:14 2017 +0300

--
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 14 
 .../zk/ZookeeperDiscoverySpiBasicTest.java  | 34 
 2 files changed, 48 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/ignite/blob/aa0ca90c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
--
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 1659dcf..cee9e65 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -425,6 +426,19 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
 
 if (log != null)
 log.info("Curator event, connection: " + newState);
+
+if (newState == ConnectionState.LOST) {
+U.warn(log, "Connection was lost, local node 
SEGMENTED");
+
+zkCurator.close();
+
+lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
+0,
+locNode,
+Collections.emptyList(),
+Collections.emptyMap(),
+null);
+}
 }
 });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa0ca90c/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
--
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 4cd86db..bc577d2 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -24,6 +24,8 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
@@ -262,6 +264,38 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 /**
  * @throws Exception If failed.
  */
+public void testSegmentation1() throws Exception {
+sesTimeout = 2000;
+testSockNio = true;
+
+Ignite node0 = startGrid(0);
+
+final CountDownLatch l = new CountDownLatch(1);
+
+node0.events().localListen(new IgnitePredicate() {
+@Override public boolean apply(Event event) {
+l.countDown();
+
+return false;
+}
+}, EventType.EVT_NODE_SEGMENTED);
+
+ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+c0.closeSocket(true);
+
+Thread.sleep(4_000);
+
+info("Allow connect");
+
+c0.allowConnect();
+
+assertTrue(l.await(10, TimeUnit.SECONDS));
+}
+
+/**
+ * @throws Exception If failed.
+ */
 public void testConnectionRestore1() throws Exception {
 testSockNio = true;