Author: markrmiller
Date: Wed Jan 13 22:29:24 2010
New Revision: 898977
URL: http://svn.apache.org/viewvc?rev=898977&view=rev
Log:
start some changes - core/shard zknodes now persistent, ephemeral zknodes done
at the node (webapp) level, begin refactor of zk state
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
(original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
Wed Jan 13 22:29:24 2010
@@ -24,11 +24,11 @@
private Map<String,CollectionInfo> collectionInfos = new
HashMap<String,CollectionInfo>();
//nocommit
- public synchronized void addCollectionInfo(String collection, CollectionInfo
collectionInfo) {
+ public void addCollectionInfo(String collection, CollectionInfo
collectionInfo) {
collectionInfos.put(collection, collectionInfo);
}
- public synchronized CollectionInfo getCollectionInfo(String collection) {
+ public CollectionInfo getCollectionInfo(String collection) {
return collectionInfos.get(collection);
}
}
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
---
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
(original)
+++
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
Wed Jan 13 22:29:24 2010
@@ -44,22 +44,27 @@
static final String ROLE_PROP = "role";
// maps shard name to the shard addresses and roles
- private final Map<String,ShardInfoList> shardNameToShardInfoList;
+ private final Map<String,Properties> shards;
private final long updateTime;
- public CollectionInfo(Map<String,ShardInfoList> shardNameToShardInfoList) {
+ private final List<String> nodes;
+
+
+
+ public CollectionInfo(Map<String,Properties> shards, List<String> nodes) {
//nocommit: defensive copy?
- this.shardNameToShardInfoList = shardNameToShardInfoList;
+ this.shards = shards;
this.updateTime = System.currentTimeMillis();
+ this.nodes = nodes;
}
- public CollectionInfo(SolrZkClient client, String path) throws
KeeperException, InterruptedException, IOException {
- //nocommit:
- // build immutable CollectionInfo
- shardNameToShardInfoList = readShardInfo(client, path);
-
- this.updateTime = System.currentTimeMillis();
- }
+// public CollectionInfo(SolrZkClient client, String path) throws
KeeperException, InterruptedException, IOException {
+// //nocommit:
+// // build immutable CollectionInfo
+// shardNameToShardInfoList = readShardInfo(client, path);
+// nodes = client.getChildren(path, null);
+// this.updateTime = System.currentTimeMillis();
+// }
/**
* Read info on the available Shards and Nodes.
@@ -80,7 +85,7 @@
throw new IllegalStateException("Cannot find zk shards node that should
exist:"
+ path);
}
- List<String> nodes = zkClient.getChildren(path, null);
+
for (String zkNodeName : nodes) {
byte[] data = zkClient.getData(path + "/" + zkNodeName, null,
@@ -120,18 +125,17 @@
*
* @return
*/
- public List<String> getSearchShards() {
- List<String> nodeList = new ArrayList<String>();
- for (ShardInfoList nodes : shardNameToShardInfoList.values()) {
- nodeList.add(nodes.getShardUrl());
- }
- return nodeList;
- }
-
- public ShardInfoList getShardInfoList(String shardName) {
- return shardNameToShardInfoList.get(shardName);
- }
+// public List<String> getSearchShards() {
+// List<String> nodeList = new ArrayList<String>();
+// for (ShardInfoList nodes : shardNameToShardInfoList.values()) {
+// nodeList.add(nodes.getShardUrl());
+// }
+// return nodeList;
+// }
+ public List<String> getNodes() {
+ return Collections.unmodifiableList(nodes);
+ }
/**
* @return last time info was updated.
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
---
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
(original)
+++
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
Wed Jan 13 22:29:24 2010
@@ -101,32 +101,10 @@
log.info("Connected:" + connected);
} else if (state == KeeperState.Disconnected) {
- // nocommit : not sure we have to reconnect like this on disconnect
- if(connected == false) {
- // nocommit
- System.out.println("we already know we are dc'd - why are we notified
twice?");
- return;
- }
+ // nocommit : not sure we have to reconnect on disconnect
+ // ZooKeeper will recover when it can
connected = false;
- // nocommit: start reconnect attempts - problem if this is shutdown
related?
- try {
- connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
- @Override
- public void update(ZooKeeper keeper) throws InterruptedException,
TimeoutException, IOException {
- waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
- client.updateKeeper(keeper);
- if(onReconnect != null) {
- onReconnect.command();
- }
- ConnectionManager.this.connected = true;
- }
- });
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
} else {
connected = false;
}
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
---
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
(original)
+++
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
Wed Jan 13 22:29:24 2010
@@ -42,10 +42,15 @@
public void process(WatchedEvent event) {
// nocommit : this will be called too often as shards register themselves?
System.out.println("shard node changed");
+
try {
+ // nocommit:
+ controller.printLayoutToStdOut();
// nocommit : refresh watcher
- // controller.getKeeperConnection().exists(event.getPath(), this);
+
+ // nocommit : rewatch
+ controller.getZkClient().exists(event.getPath(), this);
// TODO: need to load whole state?
controller.readCloudInfo();
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
(original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
Wed Jan 13 22:29:24 2010
@@ -240,6 +240,11 @@
InterruptedException {
makePath(path, null, CreateMode.PERSISTENT);
}
+
+ public void makePath(String path, CreateMode createMode) throws
KeeperException,
+ InterruptedException {
+ makePath(path, null, createMode);
+ }
/**
* Creates the path in ZooKeeper, creating each node as necessary.
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
(original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
Wed Jan 13 22:29:24 2010
@@ -26,9 +26,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -89,6 +91,8 @@
private String localHostName;
private String localHost;
+ private String hostName;
+
/**
*
* @param zkServerAddress ZooKeeper server host address
@@ -142,7 +146,7 @@
* nocommit: adds nodes if they don't exist, eg /shards/ node. consider race
* conditions
*/
- private void addZkShardsNode(String collection) throws IOException {
+ private void addZkCoresNode(String collection) throws IOException {
String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection +
SHARDS_ZKNODE;
try {
@@ -309,24 +313,24 @@
}
// nocommit - testing
- public String getSearchNodes(String collection) {
- StringBuilder nodeString = new StringBuilder();
- boolean first = true;
- List<String> nodes;
-
- nodes = cloudInfo.getCollectionInfo(collection).getSearchShards();
- // nocommit
- System.out.println("there are " + nodes.size() + " node(s)");
- for (String node : nodes) {
- nodeString.append(node);
- if (first) {
- first = false;
- } else {
- nodeString.append(',');
- }
- }
- return nodeString.toString();
- }
+// public String getSearchNodes(String collection) {
+// StringBuilder nodeString = new StringBuilder();
+// boolean first = true;
+// List<String> nodes;
+//
+// nodes = cloudInfo.getCollectionInfo(collection).getSearchShards();
+// // nocommit
+// System.out.println("there are " + nodes.size() + " node(s)");
+// for (String node : nodes) {
+// nodeString.append(node);
+// if (first) {
+// first = false;
+// } else {
+// nodeString.append(',');
+// }
+// }
+// return nodeString.toString();
+// }
SolrZkClient getZkClient() {
return zkClient;
@@ -344,8 +348,9 @@
try {
localHostName = getHostAddress();
Matcher m = URL_POST.matcher(localHostName);
+
if (m.matches()) {
- String hostName = m.group(1);
+ hostName = m.group(1);
// register host
zkClient.makePath(hostName);
} else {
@@ -354,8 +359,24 @@
+ localHostName);
}
+ // makes nodes node
+ try {
+ zkClient.makePath("/nodes");
+ } catch (KeeperException e) {
+ // its okay if another beats us creating the node
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ log.error("ZooKeeper Exception", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "ZooKeeper Exception", e);
+ }
+ }
+ String nodeName = hostName + ":" + localHostPort + "_"+ localHostContext;
+ zkClient.makePath("/nodes/" + nodeName, CreateMode.EPHEMERAL);
+
// nocommit
setUpCollectionsNode();
+
+
} catch (IOException e) {
log.error("", e);
@@ -385,7 +406,11 @@
for (String collection : collections) {
String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection
+ SHARDS_ZKNODE;
- CollectionInfo collectionInfo = new CollectionInfo(zkClient,
shardsZkPath);
+
+ List<String> nodes = zkClient.getChildren(shardsZkPath, null);
+ Map<String,Properties> shards = readShardsInfo(collection, shardsZkPath,
nodes);
+
+ CollectionInfo collectionInfo = new CollectionInfo(shards, nodes);
cloudInfo.addCollectionInfo(collection, collectionInfo);
}
@@ -470,21 +495,22 @@
* Read info on the available Shards and Nodes.
*
* @param path to the shards zkNode
+ * @param shardsZkPath
+ * @param nodeList
* @return Map from shard name to a {...@link ShardInfoList}
* @throws InterruptedException
* @throws KeeperException
* @throws IOException
*/
- public Map<String,ShardInfoList> readShardsNode(String path)
+ public Map<String,Properties> readShardsInfo(String collection, String path,
List<String> nodes)
throws KeeperException, InterruptedException, IOException {
-
- HashMap<String,ShardInfoList> shardNameToShardList = new
HashMap<String,ShardInfoList>();
-
- if (!zkClient.exists(path)) {
- throw new IllegalStateException("Cannot find zk node that should exist:"
- + path);
+ Set<String> nodesSet = new HashSet<String>(nodes.size());
+ nodesSet.addAll(nodes);
+ if(cloudInfo != null) {
+ List<String> oldNodes =
cloudInfo.getCollectionInfo(collection).getNodes();
}
- List<String> nodes = zkClient.getChildren(path, null);
+
+ Map<String,Properties> cores = new HashMap<String,Properties>();
for (String zkNodeName : nodes) {
byte[] data = zkClient.getData(path + "/" + zkNodeName, null, null);
@@ -492,30 +518,11 @@
Properties props = new Properties();
props.load(new ByteArrayInputStream(data));
- String url = (String) props.get(CollectionInfo.URL_PROP);
- String shardNameList = (String)
props.get(CollectionInfo.SHARD_LIST_PROP);
- String[] shardsNames = shardNameList.split(",");
- for (String shardName : shardsNames) {
- ShardInfoList sList = shardNameToShardList.get(shardName);
- List<ShardInfo> shardList;
- if (sList == null) {
- shardList = new ArrayList<ShardInfo>(1);
- } else {
- List<ShardInfo> oldShards = sList.getShards();
- shardList = new ArrayList<ShardInfo>(oldShards.size() + 1);
- shardList.addAll(oldShards);
- }
-
- ShardInfo shard = new ShardInfo(url);
- shardList.add(shard);
- ShardInfoList list = new ShardInfoList(shardList);
-
- shardNameToShardList.put(shardName, list);
- }
+ cores.put(zkNodeName, props);
}
- return Collections.unmodifiableMap(shardNameToShardList);
+ return Collections.unmodifiableMap(cores);
}
/**
@@ -548,7 +555,7 @@
// build layout if not exists
// nocommit : consider how we watch shards on all collections
- addZkShardsNode(collection);
+ addZkCoresNode(collection);
// create node
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -565,8 +572,19 @@
props.store(baos, PROPS_DESC);
- nodePath = zkClient.create(shardsZkPath + CORE_ZKPREFIX,
- baos.toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
+ String nodeName = hostName + ":" + localHostPort + "_"+ localHostContext +
(coreName.length() == 0 ? "" : "_" + coreName);
+ try {
+ nodePath = zkClient.create(shardsZkPath + "/" + nodeName,
+ baos.toByteArray(), CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ // its okay if the node already exists
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ }
+
+ // signal that the shards node has changed
+ zkClient.setData(shardsZkPath, (byte[])null);
return nodePath;
}
@@ -627,8 +645,14 @@
public void process(WatchedEvent event) {
System.out.println("Collections node event:" + event);
+ // nocommit : if collections node was signaled, look for new
collections
}});
+
+ collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+ for(String collection : collections) {
+ zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/shards",
shardWatcher);
+ }
}
private void setUpCollectionsNode() throws KeeperException,
InterruptedException {
Modified:
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
(original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
Wed Jan 13 22:29:24 2010
@@ -106,8 +106,6 @@
assertU(delQ("id:[100 TO 110]"));
assertU(commit());
assertQ(req("id:[100 TO 110]"), "//*...@numfound='0']");
-
- //nocommit
- System.out.println("search nodes:" +
h.getCoreContainer().getZooKeeperController().getSearchNodes("DEFAULT_CORE"));
+
}
}
Modified:
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=898977&r1=898976&r2=898977&view=diff
==============================================================================
---
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
(original)
+++
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
Wed Jan 13 22:29:24 2010
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -82,35 +83,20 @@
zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
"localhost", "8983", "/solr");
- Map<String,ShardInfoList> shardInfoMap = zkController
- .readShardsNode(shardsPath);
- assertTrue(shardInfoMap.size() > 0);
+ zkController.readCloudInfo();
+ CloudInfo cloudInfo = zkController.getCloudInfo();
+ CollectionInfo collectionInfo =
cloudInfo.getCollectionInfo("collection1");
+ assertNotNull(collectionInfo);
- Set<Entry<String,ShardInfoList>> entries = shardInfoMap.entrySet();
if (DEBUG) {
- for (Entry<String,ShardInfoList> entry : entries) {
- System.out.println("shard:" + entry.getKey() + " value:"
- + entry.getValue().toString());
+ for (String node : collectionInfo.getNodes()) {
+ System.out.println("shard:" + node);
}
}
- Set<String> keys = shardInfoMap.keySet();
-
- assertTrue(keys.size() == 2);
-
- assertTrue(keys.contains(SHARD1));
- assertTrue(keys.contains(SHARD2));
-
- ShardInfoList shardInfoList = shardInfoMap.get(SHARD1);
-
- assertEquals(3, shardInfoList.getShards().size());
-
- shardInfoList = shardInfoMap.get(SHARD2);
-
- assertEquals(1, shardInfoList.getShards().size());
-
- assertEquals(URL1, shardInfoList.getShards().get(0).getUrl());
+ // nocommit : check properties
+
} finally {
if (zkClient != null) {
zkClient.close();