Looks like the design of the connector will work for me. I will give it a try.
On Tue, Jul 17, 2018 at 4:59 PM Jason Huynh <[email protected]> wrote: > Not sure if this helps any, there used to be a Geode Spark Connector > module that was removed with ticket GEODE-194. I believe someone has > adopted/‘forked’ the code and maintaining it here: > https://github.com/Pivotal-Field-Engineering/geode-spark-connector > > > I think the connection with workers issue was addressed in the connector. > Maybe looking at that code might help with what you are doing? (or even > using the connector as is, although I am not sure what Spark version it is > compatible with) > > On Tue, Jul 17, 2018 at 2:47 PM trung kien <[email protected]> wrote: > >> Ah, that make sense. >> i will give it a try and let you know if it’s work. >> >> But my question on Client Cache Pool still stand. >> Wherether the pool will be shared among workers. >> Can worker #1 can close the cache of worker #2. >> >> On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <[email protected]> wrote: >> >>> Hi Trung, >>> >>> One pattern that comes to mind is to create a `CacheProvider` object >>> that lazily creates the cache and region objects: >>> >>> ```scala >>> object CacheProvider { >>> lazy val cache = new ClientCacheFactory()... >>> lazy val region = .... >>> } >>> ``` >>> >>> Then update your `foreachPartition` function to use this object >>> >>> ```scala >>> records -> { >>> val cache = CacheProvider.cache >>> val region = CacheProvider.region >>> ... >>> ``` >>> >>> This way the cache is not created until it is needed on the worker. >>> You'll want to delay closing the cache until your Spark application >>> ends; there are Spark event listeners you can register to cleanup >>> resources. I have not tested this code myself, so it might need some >>> tweaking. >>> >>> --Bradford >>> >>> >>> On Tue, Jul 17, 2018 at 2:10 PM trung kien <[email protected]> wrote: >>> > >>> > Hi John, >>> > >>> > The problem is that outside of rdd.foreachPartition block, the code is >>> not executed by single worker. >>> > In Spark streaming, each parition will be handle by a worker in >>> parallel. >>> > So multiple workers will work on its own processing(region,records) at >>> the same time. >>> > >>> > If I open connection outside of that block, it's not passed through >>> workers. >>> > The recommend way is creating connection inside that block >>> > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html >>> (Design Patterns for using foreachRDD) >>> > >>> > dstream.foreachRDD { rdd => >>> > rdd.foreachPartition { partitionOfRecords => >>> > val connection = createNewConnection() >>> > partitionOfRecords.foreach(record => connection.send(record)) >>> > connection.close() >>> > } >>> > } >>> > >>> > What's I don't understand is we seems to create the connection >>> locally, create then close in the same block. >>> > Is there any chance that the connection will be re-used by other >>> workers (on other machines)? >>> > For example, I have 2 workers running in parallel all of them will >>> create connection pool to locator at pretty the same time >>> > Is the connection pool shared among all my workers? so the close on >>> worker #1 will impact the connection on worker #2. >>> > >>> > On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote: >>> >> >>> >> Hi Trung- >>> >> >>> >> You definitely should not be opening and closing a connection each >>> time you process a record; that is excessive. >>> >> >>> >> I can tell you right now that, between clientCache.close(), say, in >>> the first iteration, and ClientCacheFactory.create() in the second >>> iteration, it is unlikely that Geode completely "closed" the ClientCache >>> instance (releasing all the resources) before the ClientCache instance is >>> recreated in the subsequent iteration, hence the reason you hit a >>> CacheClosedException or RegionExistsException. >>> >> >>> >> I have run into similar problems in a single test class, where I need >>> to cycle the cache instance between test case methods in the same test >>> class (suite). >>> >> >>> >> Having said that, it would be better to structure the code as... >>> >> >>> >> ClientCache cache = new ClientCacheFactory() >>> >> .addPoolLocator(locator, 10334) >>> >> .set("log-level", "WARN") >>> >> .create(); >>> >> >>> >> Region< String, Transaction > region = >>> >> cache.<String, Transaction >>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions"); >>> >> >>> >> try { >>> >> >>> >> results.foreachRDD( >>> >> rdd -> rdd.foreachPartition( >>> >> records -> { >>> >> processing(region,records) >>> >> } >>> >> ) >>> >> ); >>> >> } >>> >> finally { >>> >> cache.close(); >>> >> } >>> >> >>> >> Why do you need to open/close the connection for each record/batch >>> anyhow? >>> >> >>> >> Regards, >>> >> -John >>> >> >>> >> >>> >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]> >>> wrote: >>> >>> >>> >>> Dear all, >>> >>> >>> >>> I am looking for a correct way to write output of a spark streaming >>> job to a geode's region >>> >>> >>> >>> I'm trying following codes: >>> >>> >>> >>> results.foreachRDD( >>> >>> rdd -> rdd.foreachPartition( >>> >>> records -> >>> >>> { >>> >>> //STEP 1 >>> >>> ClientCache cache = new >>> ClientCacheFactory().addPoolLocator(locator, 10334) >>> >>> .set("log-level", >>> "WARN").create(); >>> >>> >>> >>> //STEP 2 >>> >>> Region< String, Transaction > >>> region = cache.<String, Transaction >>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions"); >>> >>> >>> >>> //STEP 3 >>> >>> processing(region,records) >>> >>> >>> >>> //STEP 4 >>> >>> cache.close(); >>> >>> } >>> >>> ) >>> >>> ); >>> >>> >>> >>> 1/ If I'm using above code, then I will get following errors: >>> >>> org.apache.geode.cache.CacheClosedException: The >>> cache is closed. >>> >>> org.apache.geode.cache.RegionExistsException: >>> /transactions >>> >>> >>> >>> I think Spark is trying to re-use the connection to geode, >>> in second batch when connection is already closed it raise exceptions >>> >>> >>> >>> 2/ I comment out the //STEP 4 (cache.close()) >>> >>> The spark job running BUT THE DATA IS MISSING RANDOMLY >>> >>> >>> >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while >>> loop >>> >>> while(true) { >>> >>> //STEP 1, >>> >>> //STEP 2 >>> >>> //STEP 3 >>> >>> //STEP 4 >>> >>> } >>> >>> >>> >>> The application is running well without exception or >>> loosing data. >>> >>> >>> >>> >>> >>> Any thoughts is really appriciated >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> -- >>> >>> Thanks >>> >>> Kien >>> >> >>> >> >>> >> >>> >> >>> >> -- >>> >> -John >>> >> john.blum10101 (skype) >>> > >>> > -- >>> > Thanks >>> > Kien >>> >> -- >> Thanks >> Kien >> > -- Thanks Kien
