Hello All:
I'm trying to figure out why, when using the CuratorFramework and NodeCache, I
can read from a path in zookeeper/curator at the framework level, but NodeCache
was somehow not able to read from the same path using the same CuratorFamework.
Is the code below incorrect, I assumed that the path returned when creating a
node would be suitable for NodeCache, and that the namespace of the
CuratorFramework would be inherited by the derived NodeCache instance.
Please consider the following scala test:
package com.myCompany.myProject.test
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.cache.NodeCache
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.{TestingCluster, TestingServer}
import org.apache.zookeeper.CreateMode
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.slf4j.LoggerFactory
import collection.JavaConverters._
/**
* Created by FoolishEwe on 2/16/17.
*/
class CuratorSuite extends FunSuite with BeforeAndAfterAll {
@transient
val logger = LoggerFactory.getLogger(this.getClass);
val nameSpace="testingNameSpace";
val testServerPort = 31314;
val testServer : TestingServer = new TestingServer(testServerPort);
val connectionString= testServer.getConnectString;
val retryTimeOutMsec = 1000;
val connectionTimeOutMsec = 1000;
val closeWaitTimeOutMsec = 1000;
val sessionTimeOutMsec = 1000;
val maxRetries = 3;
val retryPolicy = new ExponentialBackoffRetry(retryTimeOutMsec, maxRetries);
override def beforeAll() {
val port = testServer.getPort;
val tmpDir = testServer.getTempDirectory;
logger.info(s"""Test Server is running on Port ${port},
testServerPort=${testServerPort}, using tmpDir="${tmpDir}",
connectionString="${connectionString}".""");
}
override def afterAll() {
logger.info(s"""after: Shutting down testServer""")
testServer.close(); // shut it down and clean up
}
test("See if NodeCache actually finds the given node after creation by
CuratorFramework") {
val clientFactory =
CuratorFrameworkFactory.builder().connectString(connectionString).namespace(nameSpace);
// TODO: Should we be monitoring the ensemble? For now let's pull the
connection string from the ensemble, I think we may be able to see if we have
a stale connection string.
val ensembleProvider = clientFactory.getEnsembleProvider; // Allows us to
determine current set of active nodes participating in this cluster
logger.debug( s"""getCuratorFramework has
ensembleProvider.getConnectionString=${ensembleProvider.getConnectionString}""");
val client = clientFactory.
retryPolicy(retryPolicy).
connectionTimeoutMs(connectionTimeOutMsec).
maxCloseWaitMs(closeWaitTimeOutMsec).
sessionTimeoutMs(sessionTimeOutMsec).
namespace(nameSpace).
build(); // create the client
logger.info( s"""After building client before client.start""");
client.start();
logger.info( s"""client.start finished""");
val path = "/APath";
val testData = "This is some test data.";
val nodeName =
client.create.creatingParentContainersIfNeeded.withProtection.withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,
testData.getBytes());
val nodeStat = client.checkExists().forPath(nodeName);
val nodeData = client.getData.forPath(nodeName);
val nodeDataStr = new String(nodeData);
logger.info(s"""nodeData=${nodeDataStr}, nodeStat has
czxid=${nodeStat.getCzxid}, mzxid=${nodeStat.getMzxid},
pzxid=${nodeStat.getPzxid} ctime=${nodeStat.getCtime},
mtime=${nodeStat.getMtime}, version=${nodeStat.getVersion},
aVersion=${nodeStat.getAversion}, cVersion=${nodeStat.getCversion},
dataLength=${nodeStat.getDataLength}, numChildren=${nodeStat.getNumChildren},
ephemeralOwner=${nodeStat.getEphemeralOwner}""");
val nodeCache = new NodeCache(client, nodeName);
assert(nodeCache != null, s"""Got null nodeCache(client,
nodeName=${nodeName})""");
val currentData = nodeCache.getCurrentData;
assert(currentData != null , s"""getCurrentData was null from
nodeName=${nodeName}""");
val childStat = currentData.getStat;
assert(childStat != null);
logger.info(s"""got childStat = ${childStat}""");
val childData = currentData.getData;
val childDataStr = new String(childData);
assert(childDataStr == testData);
val childPath = currentData.getPath;
assert(path == nodeName);
nodeCache.close;
}
}
When I run it, I see that the get data succeeds but the getCurrentData on the
node cache failed.
*** Start sanitized output****
[Classpath dump suppressed]
Testing started at 5:03 PM ...
17:03:43,852 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could
NOT find resource [logback.groovy]
17:03:43,852 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could
NOT find resource [logback-test.xml]
17:03:43,968 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About
to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:03:43,971 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming
appender as [STDOUT]
17:03:44,033 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction -
Setting level of logger [com.myCompany.myProject] to DEBUG
17:03:44,033 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction -
Setting level of ROOT logger to ERROR
17:03:44,033 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction -
Attaching appender named [STDOUT] to Logger[ROOT]
17:03:44,034 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction
- End of configuration.
17:03:44,035 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@54d9d12d
- Registering current configuration as safe fallback
point2017:03:03:17:03:45.350 [ScalaTest-run-running-CuratorSuite] DEBUG
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:45 - getCuratorFramework
has ensembleProvider.getConnectionString=127.0.0.1:31314
2017:03:03:17:03:45.387 [ScalaTest-run-running-CuratorSuite] INFO
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:53 - After building client
before client.start
2017:03:03:17:03:45.403 [ScalaTest-run-running-CuratorSuite] INFO
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:55 - client.start finished
2017:03:03:17:03:45.436 [ScalaTest-run-running-CuratorSuite] INFO
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:62 - nodeData=This is some
test data., nodeStat has czxid=4, mzxid=4, pzxid=4 ctime=1488589425430,
mtime=1488589425430, version=0, aVersion=0, cVersion=0, dataLength=23,
numChildren=0, ephemeralOwner=0
null equaled null getCurrentData was null from
nodeName=/_c_ad40bede-7898-4dac-a002-7005e3d501b1-APath0000000000
ScalaTestFailureLocation: com.myCompany.myProject.test.CuratorSuite$$anonfun$1
at (CuratorSuite.scala:66)
org.scalatest.exceptions.TestFailedException: null equaled null getCurrentData
was null from nodeName=/_c_ad40bede-7898-4dac-a002-7005e3d501b1-APath0000000000
at
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
at
com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply$mcV$sp(CuratorSuite.scala:66)
at
com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply(CuratorSuite.scala:41)
at
com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply(CuratorSuite.scala:41)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at
com.myCompany.myProject.test.CuratorSuite.org$scalatest$BeforeAndAfterAll$$super$run(CuratorSuite.scala:15)
at
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.myCompany.myProject.CuratorSuite.run(CuratorSuite.scala:15)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
c.m.m.t.CuratorSuite.beforeAll:33 - Test Server is running on Port 31314,
testServerPort=31314, using
tmpDir="/var/folders/wr/hnqlgq7s3gl_x6fx0684fx3m0000gn/T/1488589424181-0",
connectionString="127.0.0.1:31314".
2017:03:03:17:03:45.460 [ScalaTest-run] INFO
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.afterAll:37 - after: Shutting down
testServer
Process finished with exit code 0
******End of sanitized output**********
I'm able to work around this by dropping down to the CuaratorFramework level,
but it would be nice to learn what went wrong here.
With best regards:
Bill