Hi Trung-
You definitely should not be opening and closing a connection each time you
process a record; that is excessive.
I can tell you right now that, between clientCache.close(), say, in the
first iteration, and ClientCacheFactory.create() in the second iteration,
it is unlikely that Geode completely "closed" the ClientCache instance
(releasing all the resources) before the ClientCache instance is recreated
in the subsequent iteration, hence the reason you hit a CacheClosedException
or RegionExistsException.
I have run into similar problems in a single test class, where I need to
cycle the cache instance between test case methods in the same test class
(suite).
Having said that, it would be better to structure the code as...
ClientCache cache = new ClientCacheFactory()
.addPoolLocator(locator, 10334)
.set("log-level", "WARN")
.create();
Region< String, Transaction > region =
cache.<String, Transaction >createClientRegionFactory(
ClientRegionShortcut.PROXY).create("transactions");
try {
results.foreachRDD(
rdd -> rdd.foreachPartition(
records -> {
processing(region,records)
}
)
);
}
finally {
cache.close();
}
Why do you need to open/close the connection for each record/batch anyhow?
Regards,
-John
On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]> wrote:
> Dear all,
>
> I am looking for a correct way to write output of a spark streaming job to
> a geode's region
>
> I'm trying following codes:
>
> results.foreachRDD(
> rdd -> rdd.foreachPartition(
> records ->
> {
> //STEP 1
> ClientCache cache = new
> ClientCacheFactory().addPoolLocator(locator, 10334)
> .set("log-level", "WARN").create();
>
> //STEP 2
> Region< String, Transaction > region
> = cache.<String, Transaction >createClientRegionFactory(
> ClientRegionShortcut.PROXY).create("transactions");
>
> //STEP 3
> processing(region,records)
>
> //STEP 4
> cache.close();
> }
> )
> );
>
> 1/ If I'm using above code, then I will get following errors:
> org.apache.geode.cache.CacheClosedException: The cache is
> closed.
> org.apache.geode.cache.RegionExistsException: /transactions
>
> I think Spark is trying to re-use the connection to geode, in
> second batch when connection is already closed it raise exceptions
>
> 2/ I comment out the //STEP 4 (cache.close())
> The spark job running BUT THE DATA IS MISSING RANDOMLY
>
> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
> while(true) {
> //STEP 1,
> //STEP 2
> //STEP 3
> //STEP 4
> }
>
> The application is running well without exception or loosing
> data.
>
>
> Any thoughts is really appriciated
>
>
>
>
> --
> Thanks
> Kien
>
--
-John
john.blum10101 (skype)