Ah, that make sense. i will give it a try and let you know if it’s work. But my question on Client Cache Pool still stand. Wherether the pool will be shared among workers. Can worker #1 can close the cache of worker #2.
On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <[email protected]> wrote: > Hi Trung, > > One pattern that comes to mind is to create a `CacheProvider` object > that lazily creates the cache and region objects: > > ```scala > object CacheProvider { > lazy val cache = new ClientCacheFactory()... > lazy val region = .... > } > ``` > > Then update your `foreachPartition` function to use this object > > ```scala > records -> { > val cache = CacheProvider.cache > val region = CacheProvider.region > ... > ``` > > This way the cache is not created until it is needed on the worker. > You'll want to delay closing the cache until your Spark application > ends; there are Spark event listeners you can register to cleanup > resources. I have not tested this code myself, so it might need some > tweaking. > > --Bradford > > > On Tue, Jul 17, 2018 at 2:10 PM trung kien <[email protected]> wrote: > > > > Hi John, > > > > The problem is that outside of rdd.foreachPartition block, the code is > not executed by single worker. > > In Spark streaming, each parition will be handle by a worker in parallel. > > So multiple workers will work on its own processing(region,records) at > the same time. > > > > If I open connection outside of that block, it's not passed through > workers. > > The recommend way is creating connection inside that block > > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html > (Design Patterns for using foreachRDD) > > > > dstream.foreachRDD { rdd => > > rdd.foreachPartition { partitionOfRecords => > > val connection = createNewConnection() > > partitionOfRecords.foreach(record => connection.send(record)) > > connection.close() > > } > > } > > > > What's I don't understand is we seems to create the connection locally, > create then close in the same block. > > Is there any chance that the connection will be re-used by other workers > (on other machines)? > > For example, I have 2 workers running in parallel all of them will > create connection pool to locator at pretty the same time > > Is the connection pool shared among all my workers? so the close on > worker #1 will impact the connection on worker #2. > > > > On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote: > >> > >> Hi Trung- > >> > >> You definitely should not be opening and closing a connection each time > you process a record; that is excessive. > >> > >> I can tell you right now that, between clientCache.close(), say, in the > first iteration, and ClientCacheFactory.create() in the second iteration, > it is unlikely that Geode completely "closed" the ClientCache instance > (releasing all the resources) before the ClientCache instance is recreated > in the subsequent iteration, hence the reason you hit a > CacheClosedException or RegionExistsException. > >> > >> I have run into similar problems in a single test class, where I need > to cycle the cache instance between test case methods in the same test > class (suite). > >> > >> Having said that, it would be better to structure the code as... > >> > >> ClientCache cache = new ClientCacheFactory() > >> .addPoolLocator(locator, 10334) > >> .set("log-level", "WARN") > >> .create(); > >> > >> Region< String, Transaction > region = > >> cache.<String, Transaction > >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions"); > >> > >> try { > >> > >> results.foreachRDD( > >> rdd -> rdd.foreachPartition( > >> records -> { > >> processing(region,records) > >> } > >> ) > >> ); > >> } > >> finally { > >> cache.close(); > >> } > >> > >> Why do you need to open/close the connection for each record/batch > anyhow? > >> > >> Regards, > >> -John > >> > >> > >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]> > wrote: > >>> > >>> Dear all, > >>> > >>> I am looking for a correct way to write output of a spark streaming > job to a geode's region > >>> > >>> I'm trying following codes: > >>> > >>> results.foreachRDD( > >>> rdd -> rdd.foreachPartition( > >>> records -> > >>> { > >>> //STEP 1 > >>> ClientCache cache = new > ClientCacheFactory().addPoolLocator(locator, 10334) > >>> .set("log-level", "WARN").create(); > >>> > >>> //STEP 2 > >>> Region< String, Transaction > > region = cache.<String, Transaction > >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions"); > >>> > >>> //STEP 3 > >>> processing(region,records) > >>> > >>> //STEP 4 > >>> cache.close(); > >>> } > >>> ) > >>> ); > >>> > >>> 1/ If I'm using above code, then I will get following errors: > >>> org.apache.geode.cache.CacheClosedException: The cache > is closed. > >>> org.apache.geode.cache.RegionExistsException: > /transactions > >>> > >>> I think Spark is trying to re-use the connection to geode, in > second batch when connection is already closed it raise exceptions > >>> > >>> 2/ I comment out the //STEP 4 (cache.close()) > >>> The spark job running BUT THE DATA IS MISSING RANDOMLY > >>> > >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while > loop > >>> while(true) { > >>> //STEP 1, > >>> //STEP 2 > >>> //STEP 3 > >>> //STEP 4 > >>> } > >>> > >>> The application is running well without exception or loosing > data. > >>> > >>> > >>> Any thoughts is really appriciated > >>> > >>> > >>> > >>> > >>> -- > >>> Thanks > >>> Kien > >> > >> > >> > >> > >> -- > >> -John > >> john.blum10101 (skype) > > > > -- > > Thanks > > Kien > -- Thanks Kien
