Hello All:

I'm trying to figure out why, when using the CuratorFramework and NodeCache, I 
can read from a path in zookeeper/curator at the framework level, but NodeCache 
was somehow not able to read from the same path using the same CuratorFamework. 
Is the code below incorrect, I assumed that the path returned when creating a 
node would be suitable for NodeCache, and that the namespace of the 
CuratorFramework would be inherited by the derived NodeCache instance.


Please consider the following scala test:


package com.myCompany.myProject.test

import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.cache.NodeCache
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.{TestingCluster, TestingServer}
import org.apache.zookeeper.CreateMode
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.slf4j.LoggerFactory
import collection.JavaConverters._

/**
 * Created by FoolishEwe on 2/16/17.
 */
class CuratorSuite extends FunSuite  with BeforeAndAfterAll {

  @transient
  val logger = LoggerFactory.getLogger(this.getClass);
  val nameSpace="testingNameSpace";
  val testServerPort = 31314;
  val testServer : TestingServer = new TestingServer(testServerPort);
  val connectionString= testServer.getConnectString;
  val retryTimeOutMsec = 1000;
  val connectionTimeOutMsec = 1000;
  val closeWaitTimeOutMsec = 1000;
  val sessionTimeOutMsec = 1000;
  val maxRetries = 3;
  val retryPolicy = new ExponentialBackoffRetry(retryTimeOutMsec, maxRetries);

  override def beforeAll() {
    val port = testServer.getPort;
    val tmpDir = testServer.getTempDirectory;
    logger.info(s"""Test Server is running on Port ${port}, 
testServerPort=${testServerPort}, using tmpDir="${tmpDir}", 
connectionString="${connectionString}".""");
  }

  override def afterAll() {
    logger.info(s"""after: Shutting down testServer""")
    testServer.close(); // shut it down and clean up
  }

  test("See if NodeCache actually finds the given node after creation by 
CuratorFramework") {
    val clientFactory = 
CuratorFrameworkFactory.builder().connectString(connectionString).namespace(nameSpace);
    // TODO: Should we be monitoring the ensemble?  For now let's pull the 
connection string from the ensemble,  I think we may be able to see if we have 
a stale connection string.
    val ensembleProvider = clientFactory.getEnsembleProvider; // Allows us to 
determine current set of active nodes participating in this cluster
    logger.debug( s"""getCuratorFramework has 
ensembleProvider.getConnectionString=${ensembleProvider.getConnectionString}""");
    val client = clientFactory.
      retryPolicy(retryPolicy).
      connectionTimeoutMs(connectionTimeOutMsec).
      maxCloseWaitMs(closeWaitTimeOutMsec).
      sessionTimeoutMs(sessionTimeOutMsec).
      namespace(nameSpace).
      build(); // create the client
    logger.info( s"""After building client before client.start""");
    client.start();
    logger.info( s"""client.start finished""");
    val path = "/APath";
    val testData = "This is some test data.";
    val nodeName = 
client.create.creatingParentContainersIfNeeded.withProtection.withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,
 testData.getBytes());
    val nodeStat = client.checkExists().forPath(nodeName);
    val nodeData = client.getData.forPath(nodeName);
    val nodeDataStr = new String(nodeData);
    logger.info(s"""nodeData=${nodeDataStr}, nodeStat has 
czxid=${nodeStat.getCzxid}, mzxid=${nodeStat.getMzxid}, 
pzxid=${nodeStat.getPzxid} ctime=${nodeStat.getCtime}, 
mtime=${nodeStat.getMtime}, version=${nodeStat.getVersion}, 
aVersion=${nodeStat.getAversion}, cVersion=${nodeStat.getCversion}, 
dataLength=${nodeStat.getDataLength}, numChildren=${nodeStat.getNumChildren}, 
ephemeralOwner=${nodeStat.getEphemeralOwner}""");
    val nodeCache = new NodeCache(client, nodeName);
    assert(nodeCache != null, s"""Got null nodeCache(client, 
nodeName=${nodeName})""");
    val currentData = nodeCache.getCurrentData;
    assert(currentData != null , s"""getCurrentData was null from 
nodeName=${nodeName}""");
    val childStat = currentData.getStat;
    assert(childStat != null);
    logger.info(s"""got childStat = ${childStat}""");
    val childData = currentData.getData;
    val childDataStr = new String(childData);
    assert(childDataStr == testData);
    val childPath = currentData.getPath;
    assert(path == nodeName);
    nodeCache.close;
  }

}


When I run it, I see that the get data succeeds but the getCurrentData on the 
node cache failed.

*** Start sanitized output****

[Classpath dump suppressed]

Testing started at 5:03 PM ...

17:03:43,852 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could 
NOT find resource [logback.groovy]

17:03:43,852 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could 
NOT find resource [logback-test.xml]

17:03:43,968 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About 
to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]

17:03:43,971 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming 
appender as [STDOUT]

17:03:44,033 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - 
Setting level of logger [com.myCompany.myProject] to DEBUG

17:03:44,033 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - 
Setting level of ROOT logger to ERROR

17:03:44,033 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - 
Attaching appender named [STDOUT] to Logger[ROOT]

17:03:44,034 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction 
- End of configuration.

17:03:44,035 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@54d9d12d 
- Registering current configuration as safe fallback 
point2017:03:03:17:03:45.350 [ScalaTest-run-running-CuratorSuite] DEBUG 
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:45 - getCuratorFramework 
has ensembleProvider.getConnectionString=127.0.0.1:31314

2017:03:03:17:03:45.387 [ScalaTest-run-running-CuratorSuite] INFO  
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:53 - After building client 
before client.start

2017:03:03:17:03:45.403 [ScalaTest-run-running-CuratorSuite] INFO  
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:55 - client.start finished

2017:03:03:17:03:45.436 [ScalaTest-run-running-CuratorSuite] INFO  
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.apply$mcV$sp:62 - nodeData=This is some 
test data., nodeStat has czxid=4, mzxid=4, pzxid=4 ctime=1488589425430, 
mtime=1488589425430, version=0, aVersion=0, cVersion=0, dataLength=23, 
numChildren=0, ephemeralOwner=0


null equaled null getCurrentData was null from 
nodeName=/_c_ad40bede-7898-4dac-a002-7005e3d501b1-APath0000000000

ScalaTestFailureLocation: com.myCompany.myProject.test.CuratorSuite$$anonfun$1 
at (CuratorSuite.scala:66)

org.scalatest.exceptions.TestFailedException: null equaled null getCurrentData 
was null from nodeName=/_c_ad40bede-7898-4dac-a002-7005e3d501b1-APath0000000000

at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)

at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)

at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)

at 
com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply$mcV$sp(CuratorSuite.scala:66)

at 
com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply(CuratorSuite.scala:41)

at 
com.myCompany.myProject.test.CuratorSuite$$anonfun$1.apply(CuratorSuite.scala:41)

at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)

at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

at org.scalatest.Transformer.apply(Transformer.scala:22)

at org.scalatest.Transformer.apply(Transformer.scala:20)

at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)

at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)

at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)

at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)

at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)

at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)

at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)

at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)

at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)

at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)

at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)

at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)

at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)

at scala.collection.immutable.List.foreach(List.scala:318)

at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)

at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)

at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)

at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)

at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)

at org.scalatest.Suite$class.run(Suite.scala:1147)

at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)

at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)

at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)

at org.scalatest.SuperEngine.runImpl(Engine.scala:521)

at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)

at 
com.myCompany.myProject.test.CuratorSuite.org$scalatest$BeforeAndAfterAll$$super$run(CuratorSuite.scala:15)

at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)

at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)

at com.myCompany.myProject.CuratorSuite.run(CuratorSuite.scala:15)

at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)

at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)

at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)

at scala.collection.immutable.List.foreach(List.scala:318)

at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)

at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)

at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)

at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)

at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)

at org.scalatest.tools.Runner$.run(Runner.scala:850)

at org.scalatest.tools.Runner.run(Runner.scala)

at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)

at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)



c.m.m.t.CuratorSuite.beforeAll:33 - Test Server is running on Port 31314, 
testServerPort=31314, using 
tmpDir="/var/folders/wr/hnqlgq7s3gl_x6fx0684fx3m0000gn/T/1488589424181-0", 
connectionString="127.0.0.1:31314".

2017:03:03:17:03:45.460 [ScalaTest-run] INFO  
c.a.g.r.l.c.c.a.g.r.l.c.t.CuratorSuite.afterAll:37 - after: Shutting down 
testServer


Process finished with exit code 0

******End of sanitized output**********

I'm able to work around this by dropping down to the CuaratorFramework level, 
but it would be nice to learn what went wrong here.

With best regards:

Bill

Reply via email to