Author: markrmiller
Date: Tue Jan 26 16:32:42 2010
New Revision: 903310
URL: http://svn.apache.org/viewvc?rev=903310&view=rev
Log:
updates and cleanup
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=903310&r1=903309&r2=903310&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
(original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
Tue Jan 26 16:32:42 2010
@@ -80,9 +80,6 @@
public static final String ROLE_PROP = "role";
public static final String NODE_NAME = "node_name";
- // for when we do incremental cloud state updates
- //final ShardsWatcher shardWatcher = new ShardsWatcher(this);
-
private SolrZkClient zkClient;
private volatile CloudState cloudState;
@@ -126,12 +123,11 @@
public void command() {
try {
- // nocommit : re-register ephemeral nodes, (possibly) wait a
while
- // for others to do the same, then load
+
createEphemeralNode();
// register cores in case any new cores came online will zk was
down
- // coreContainer may currently be null in tests, so don't
reregister
+ // coreContainer may currently be null in tests, so don't
re-register
if(coreContainer != null) {
Collection<SolrCore> cores = coreContainer.getCores();
for(SolrCore core : cores) {
@@ -179,7 +175,7 @@
// makes shards zkNode if it doesn't exist
zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
- // nocommit
+ // nocommit - scrutinize
// ping that there is a new collection or a new shardId
zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
}
@@ -234,15 +230,6 @@
return cloudState;
}
- private List<String> getCollectionNames() throws KeeperException,
- InterruptedException {
-
- List<String> collectionNodes = zkClient.getChildren(COLLECTIONS_ZKNODE,
- null);
-
- return collectionNodes;
- }
-
/**
* Load SolrConfig from ZooKeeper.
*
@@ -361,6 +348,9 @@
// makes nodes node
try {
+ // TODO: for now, no watch - if a node goes down or comes up, its
going to change
+ // shards info anyway and cause a state update - this could change if
we do incremental
+ // state update
zkClient.makePath(NODES_ZKNODE);
} catch (KeeperException e) {
// its okay if another beats us creating the node
@@ -670,16 +660,16 @@
"", e);
}
- log.info("Start watching collections node for changes");
+ log.info("Start watching collections zk node for changes");
zkClient.getChildren(COLLECTIONS_ZKNODE, new Watcher(){
- public synchronized void process(WatchedEvent event) {
+ public void process(WatchedEvent event) {
try {
- // TODO: fine grained - just reload what's changed
- // nocommit
- log.info("Notified of collection change");
- addShardZkNodeWatches();
- updateCloudState(false);
+ log.info("Detected a new or removed collection");
+ synchronized (ZkController.this) {
+ addShardZkNodeWatches();
+ updateCloudState(false);
+ }
// re-watch
zkClient.getChildren(event.getPath(), this);
} catch (KeeperException e) {
@@ -702,14 +692,16 @@
zkClient.exists(COLLECTIONS_ZKNODE, new Watcher(){
- public synchronized void process(WatchedEvent event) {
+ public void process(WatchedEvent event) {
if(event.getType() != EventType.NodeDataChanged) {
return;
}
log.info("Notified of CloudState change");
try {
- addShardZkNodeWatches();
- updateCloudState(false);
+ synchronized (ZkController.this) {
+ addShardZkNodeWatches();
+ updateCloudState(false);
+ }
zkClient.exists(COLLECTIONS_ZKNODE, this);
} catch (KeeperException e) {
log.error("", e);
@@ -737,30 +729,53 @@
List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
for(final String collection : collections) {
if(!knownCollections.contains(collection)) {
- zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection +
SHARDS_ZKNODE, new Watcher(){
-
+ Watcher watcher = new Watcher() {
public void process(WatchedEvent event) {
- //nocommit
- System.out.println("ShardId node added/removed/changed:");
+ log.info("Detected changed ShardId in collection:" + collection);
try {
addShardsWatches(collection);
+ updateCloudState(false);
} catch (KeeperException e) {
log.error("", e);
- throw new
ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
- throw new
ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- }});
+ }
+ };
+ boolean madeWatch = true;
+
+ for (int i = 0; i < 5; i++) {
+ try {
+ zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
+ + SHARDS_ZKNODE, watcher);
+ } catch (KeeperException.NoNodeException e) {
+ // most likely, the collections node has been created, but not the
+ // shards node yet -- pause and try again
+ madeWatch = false;
+ if(i == 4) {
+ throw e;
+ }
+ Thread.sleep(50);
+ }
+ if(madeWatch) {
+ break;
+ }
+ }
}
}
}
- public void addShardsWatches(String collection) throws KeeperException,
+ public void addShardsWatches(final String collection) throws KeeperException,
InterruptedException {
if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection +
SHARDS_ZKNODE)) {
List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
@@ -779,9 +794,24 @@
+ SHARDS_ZKNODE + "/" + shardId, new Watcher() {
public void process(WatchedEvent event) {
- // nocommit
- System.out.println("shard changed under:" + shardId);
-
+ log.info("Detected a shard change under ShardId:" + shardId + "
in collection:" + collection);
+ try {
+ updateCloudState(false);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new
ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new
ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new
ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
}
});
}
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
URL:
http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java?rev=903310&r1=903309&r2=903310&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
(original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
Tue Jan 26 16:32:42 2010
@@ -425,8 +425,10 @@
if(zooKeeperController != null) {
try {
- zooKeeperController.addShardZkNodeWatches();
- zooKeeperController.updateCloudState(true);
+ synchronized (zooKeeperController) {
+ zooKeeperController.addShardZkNodeWatches();
+ zooKeeperController.updateCloudState(true);
+ }
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();