Your NodeCache is never started. You must call start(). Also, given that you want results immediately, you should use start(true).
-Jordan > On Mar 3, 2017, at 5:33 PM, Foolish Ewe <[email protected]> wrote: > > Hello All: > > I'm trying to figure out why, when using the CuratorFramework and NodeCache, > I can read from a path in zookeeper/curator at the framework level, but > NodeCache was somehow not able to read from the same path using the same > CuratorFamework. Is the code below incorrect, I assumed that the path > returned when creating a node would be suitable for NodeCache, and that the > namespace of the CuratorFramework would be inherited by the derived NodeCache > instance. > > Please consider the following scala test: > > package com.myCompany.myProject.test > > import org.apache.curator.framework.CuratorFrameworkFactory > import org.apache.curator.framework.recipes.cache.NodeCache > import org.apache.curator.retry.ExponentialBackoffRetry > import org.apache.curator.test.{TestingCluster, TestingServer} > import org.apache.zookeeper.CreateMode > import org.scalatest.{BeforeAndAfterAll, FunSuite} > import org.slf4j.LoggerFactory > import collection.JavaConverters._ > > /** > * Created by FoolishEwe on 2/16/17. > */ > class CuratorSuite extends FunSuite with BeforeAndAfterAll { > > @transient > val logger = LoggerFactory.getLogger(this.getClass); > val nameSpace="testingNameSpace"; > val testServerPort = 31314; > val testServer : TestingServer = new TestingServer(testServerPort); > val connectionString= testServer.getConnectString; > val retryTimeOutMsec = 1000; > val connectionTimeOutMsec = 1000; > val closeWaitTimeOutMsec = 1000; > val sessionTimeOutMsec = 1000; > val maxRetries = 3; > val retryPolicy = new ExponentialBackoffRetry(retryTimeOutMsec, maxRetries); > > override def beforeAll() { > val port = testServer.getPort; > val tmpDir = testServer.getTempDirectory; > logger <http://logger.info/>.info <http://logger.info/>(s"""Test Server > is running on Port ${port}, testServerPort=${testServerPort}, using > tmpDir="${tmpDir}", connectionString="${connectionString}"."""); > } > > override def afterAll() { > logger <http://logger.info/>.info <http://logger.info/>(s"""after: > Shutting down testServer""") > testServer.close(); // shut it down and clean up > } > > test("See if NodeCache actually finds the given node after creation by > CuratorFramework") { > val clientFactory = > CuratorFrameworkFactory.builder().connectString(connectionString).namespace(nameSpace); > // TODO: Should we be monitoring the ensemble? For now let's pull the > connection string from the ensemble, I think we may be able to see if we > have a stale connection string. > val ensembleProvider = clientFactory.getEnsembleProvider; // Allows us to > determine current set of active nodes participating in this cluster > logger.debug( s"""getCuratorFramework has > ensembleProvider.getConnectionString=${ensembleProvider.getConnectionString}"""); > val client = clientFactory. > retryPolicy(retryPolicy). > connectionTimeoutMs(connectionTimeOutMsec). > maxCloseWaitMs(closeWaitTimeOutMsec). > sessionTimeoutMs(sessionTimeOutMsec). > namespace(nameSpace). > build(); // create the client > logger <http://logger.info/>.info <http://logger.info/>( s"""After > building client before client.start"""); > client.start(); > logger <http://logger.info/>.info <http://logger.info/>( s"""client.start > finished"""); > val path = "/APath"; > val testData = "This is some test data."; > val nodeName = > client.create.creatingParentContainersIfNeeded.withProtection.withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, > testData.getBytes()); > val nodeStat = client.checkExists().forPath(nodeName); > val nodeData = client.getData.forPath(nodeName); > val nodeDataStr = new String(nodeData); > logger <http://logger.info/>.info > <http://logger.info/>(s"""nodeData=${nodeDataStr}, nodeStat has > czxid=${nodeStat.getCzxid}, mzxid=${nodeStat.getMzxid}, > pzxid=${nodeStat.getPzxid} ctime=${nodeStat.getCtime}, > mtime=${nodeStat.getMtime}, version=${nodeStat.getVersion}, > aVersion=${nodeStat.getAversion}, cVersion=${nodeStat.getCversion}, > dataLength=${nodeStat.getDataLength}, numChildren=${nodeStat.getNumChildren}, > ephemeralOwner=${nodeStat.getEphemeralOwner}"""); > val nodeCache = new NodeCache(client, nodeName); > assert(nodeCache != null, s"""Got null nodeCache(client, > nodeName=${nodeName})"""); > val currentData = nodeCache.getCurrentData; > assert(currentData != null , s"""getCurrentData was null from > nodeName=${nodeName}"""); > val childStat = currentData.getStat; > assert(childStat != null); > logger <http://logger.info/>.info <http://logger.info/>(s"""got childStat > = ${childStat}"""); > val childData = currentData.getData; > val childDataStr = new String(childData); > assert(childDataStr == testData); > val childPath = currentData.getPath; > assert(path == nodeName); > nodeCache.close; > } > } > > When I run it, I see that the get data succeeds but the getCurrentData on the > node cache failed. > *** Start sanitized output**** > [Classpath dump suppressed] > Testing started at 5:03 PM ... > 17:03:43,852 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could > NOT find resource [logback.groovy] > 17:03:43,852 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could > NOT find resource [logback-test.xml] > 17:03:43,968 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - > About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender] > 17:03:43,971 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - > Naming appender as [STDOUT] > 17:03:44,033 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - > Setting level of logger [com.myCompany.myProject] to DEBUG > 17:03:44,033 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - > Setting level of ROOT logger to ERROR > 17:03:44,033 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - > Attaching appender named [STDOUT] to Logger[ROOT] > 17:03:44,034 |-INFO in > ch.qos.logback.classic.joran.action.ConfigurationAction - End of > configuration. > 17:03:44,035 |-INFO in > ch.qos.logback.classic.joran.JoranConfigurator@54d9d12d - Registering current > configuration as safe fallback point2017:03:03:17:03:45.350 > [ScalaTest-run-running-CuratorSuite] DEBUG > c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:45 - getCuratorFramework > has ensembleProvider.getConnectionString=127.0.0.1:31314 > 2017:03:03:17:03:45.387 [ScalaTest-run-running-CuratorSuite] INFO > c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:53 - After building > client before client.start > 2017:03:03:17:03:45.403 [ScalaTest-run-running-CuratorSuite] INFO > c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:55 - client.start > finished > 2017:03:03:17:03:45.436 [ScalaTest-run-running-CuratorSuite] INFO > c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:62 - nodeData=This is > some test data., nodeStat has czxid=4, mzxid=4, pzxid=4 ctime=1488589425430, > mtime=1488589425430, version=0, aVersion=0, cVersion=0, dataLength=23, > numChildren=0, ephemeralOwner=0 > > null equaled null getCurrentData was null from > nodeName=/_c_ad40bede-7898-4dac-a002-7005e3d501b1-APath0000000000 > ScalaTestFailureLocation: > com.myCompany.myProject.test.CuratorSuite$$anonfun$1 at > (CuratorSuite.scala:66) > org.scalatest.exceptions.TestFailedException: null equaled null > getCurrentData was null from > nodeName=/_c_ad40bede-7898-4dac-a002-7005e3d501b1-APath0000000000 > at > org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528) > at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) > at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501) > at > com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply$mcV$sp(CuratorSuite.scala:66) > at > com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply(CuratorSuite.scala:41) > at > com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply(CuratorSuite.scala:41) > at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > at org.scalatest.Transformer.apply(Transformer.scala:22) > at org.scalatest.Transformer.apply(Transformer.scala:20) > at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) > at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196) > at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560) > at > org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183) > at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) > at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) > at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196) > at org.scalatest.FunSuite.runTest(FunSuite.scala:1560) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) > at org.scalatest.SuperEngine.org > <http://org.scalatest.superengine.org/>$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379) > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) > at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229) > at org.scalatest.FunSuite.runTests(FunSuite.scala:1560) > at org.scalatest.Suite$class.run(Suite.scala:1147) > at org.scalatest.FunSuite.org > <http://org.scalatest.funsuite.org/>$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560) > at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233) > at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233) > at org.scalatest.SuperEngine.runImpl(Engine.scala:521) > at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233) > at > com.myCompany.myProject.test.CuratorSuite.org$scalatest$BeforeAndAfterAll$$super$run(CuratorSuite.scala:15) > at > org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213) > at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210) > at com.myCompany.myProject.CuratorSuite.run(CuratorSuite.scala:15) > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010) > at > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500) > at > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010) > at org.scalatest.tools.Runner$.run(Runner.scala:850) > at org.scalatest.tools.Runner.run(Runner.scala) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > > c.m.m.t.CuratorSuite.beforeAll:33 - Test Server is running on Port 31314, > testServerPort=31314, using > tmpDir="/var/folders/wr/hnqlgq7s3gl_x6fx0684fx3m0000gn/T/1488589424181-0", > connectionString="127.0.0.1:31314". > 2017:03:03:17:03:45.460 [ScalaTest-run] INFO > c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.afterAll:37 - after: Shutting down > testServer > > Process finished with exit code 0 > > ******End of sanitized output********** > > I'm able to work around this by dropping down to the CuaratorFramework level, > but it would be nice to learn what went wrong here. > > With best regards: > > Bill
