Ah, that make sense.
i will give it a try and let you know if it’s work.

But my question on Client Cache Pool still stand.
Wherether the pool will be shared among workers.
Can worker #1 can close the cache of worker #2.

On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <[email protected]> wrote:

> Hi Trung,
>
> One pattern that comes to mind is to create a `CacheProvider` object
> that lazily creates the cache and region objects:
>
> ```scala
> object CacheProvider {
> lazy val cache = new ClientCacheFactory()...
> lazy val region = ....
> }
> ```
>
> Then update your `foreachPartition` function to use this object
>
> ```scala
> records -> {
>   val cache = CacheProvider.cache
>   val region = CacheProvider.region
>   ...
> ```
>
> This way the cache is not created until it is needed on the worker.
> You'll want to delay closing the cache until your Spark application
> ends; there are Spark event listeners you can register to cleanup
> resources. I have not tested this code myself, so it might need some
> tweaking.
>
> --Bradford
>
>
> On Tue, Jul 17, 2018 at 2:10 PM trung kien <[email protected]> wrote:
> >
> > Hi John,
> >
> > The problem is that outside of rdd.foreachPartition block, the code is
> not executed by single worker.
> > In Spark streaming, each parition will be handle by a worker in parallel.
> > So multiple workers will work on its own processing(region,records) at
> the same time.
> >
> > If I open connection outside of that block, it's not passed through
> workers.
> > The recommend way is creating connection inside that block
> > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html
> (Design Patterns for using foreachRDD)
> >
> > dstream.foreachRDD { rdd =>
> >   rdd.foreachPartition { partitionOfRecords =>
> >     val connection = createNewConnection()
> >     partitionOfRecords.foreach(record => connection.send(record))
> >     connection.close()
> >   }
> > }
> >
> > What's I don't understand is we seems to create the connection locally,
> create then close in the same block.
> > Is there any chance that the connection will be re-used by other workers
> (on other machines)?
> > For example, I have 2 workers running in parallel all of them will
> create connection pool to locator at pretty the same time
> > Is the connection pool shared among all my workers? so the close on
> worker #1 will impact the connection on worker #2.
> >
> > On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote:
> >>
> >> Hi Trung-
> >>
> >> You definitely should not be opening and closing a connection each time
> you process a record; that is excessive.
> >>
> >> I can tell you right now that, between clientCache.close(), say, in the
> first iteration, and ClientCacheFactory.create() in the second iteration,
> it is unlikely that Geode completely "closed" the ClientCache instance
> (releasing all the resources) before the ClientCache instance is recreated
> in the subsequent iteration, hence the reason you hit a
> CacheClosedException or RegionExistsException.
> >>
> >> I have run into similar problems in a single test class, where I need
> to cycle the cache instance between test case methods in the same test
> class (suite).
> >>
> >> Having said that, it would be better to structure the code as...
> >>
> >> ClientCache cache = new ClientCacheFactory()
> >>     .addPoolLocator(locator, 10334)
> >>     .set("log-level", "WARN")
> >>     .create();
> >>
> >> Region< String, Transaction > region =
> >>     cache.<String, Transaction
> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
> >>
> >> try {
> >>
> >>     results.foreachRDD(
> >>         rdd -> rdd.foreachPartition(
> >>             records -> {
> >>                 processing(region,records)
> >>             }
> >>         )
> >>     );
> >> }
> >> finally {
> >>     cache.close();
> >> }
> >>
> >> Why do you need to open/close the connection for each record/batch
> anyhow?
> >>
> >> Regards,
> >> -John
> >>
> >>
> >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]>
> wrote:
> >>>
> >>> Dear all,
> >>>
> >>> I am looking for a correct way to write output of a spark streaming
> job to a geode's region
> >>>
> >>> I'm trying following codes:
> >>>
> >>> results.foreachRDD(
> >>>                     rdd -> rdd.foreachPartition(
> >>>                             records ->
> >>>                             {
> >>>                                     //STEP 1
> >>>                                     ClientCache cache = new
> ClientCacheFactory().addPoolLocator(locator, 10334)
> >>>                                     .set("log-level", "WARN").create();
> >>>
> >>>                                     //STEP 2
> >>>                                     Region< String, Transaction >
> region = cache.<String, Transaction
> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
> >>>
> >>>                                     //STEP 3
> >>>                                     processing(region,records)
> >>>
> >>>                                     //STEP 4
> >>>                                     cache.close();
> >>>                             }
> >>>                     )
> >>>                 );
> >>>
> >>> 1/ If I'm using above code, then I will get following errors:
> >>>                org.apache.geode.cache.CacheClosedException: The cache
> is closed.
> >>>                org.apache.geode.cache.RegionExistsException:
> /transactions
> >>>
> >>>         I think Spark is trying to re-use the connection to geode, in
> second batch when connection is already closed it raise exceptions
> >>>
> >>> 2/ I comment out the //STEP 4 (cache.close())
> >>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
> >>>
> >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while
> loop
> >>>               while(true) {
> >>>                    //STEP 1,
> >>>                    //STEP 2
> >>>                    //STEP 3
> >>>                    //STEP 4
> >>>               }
> >>>
> >>>           The application is running well without exception or loosing
> data.
> >>>
> >>>
> >>> Any thoughts is really appriciated
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Thanks
> >>> Kien
> >>
> >>
> >>
> >>
> >> --
> >> -John
> >> john.blum10101 (skype)
> >
> > --
> > Thanks
> > Kien
>
-- 
Thanks
Kien

Reply via email to