Looks like the design of the connector will work for me.

I will give it a try.

On Tue, Jul 17, 2018 at 4:59 PM Jason Huynh <[email protected]> wrote:

> Not sure if this helps any, there used to be a Geode Spark Connector
> module that was removed with ticket GEODE-194.  I believe someone has
> adopted/‘forked’ the code and maintaining it here:
> https://github.com/Pivotal-Field-Engineering/geode-spark-connector
>
>
> I think the connection with workers issue was addressed in the connector.
> Maybe looking at that code might help with what you are doing? (or even
> using the connector as is, although I am not sure what Spark version it is
> compatible with)
>
> On Tue, Jul 17, 2018 at 2:47 PM trung kien <[email protected]> wrote:
>
>> Ah, that make sense.
>> i will give it a try and let you know if it’s work.
>>
>> But my question on Client Cache Pool still stand.
>> Wherether the pool will be shared among workers.
>> Can worker #1 can close the cache of worker #2.
>>
>> On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <[email protected]> wrote:
>>
>>> Hi Trung,
>>>
>>> One pattern that comes to mind is to create a `CacheProvider` object
>>> that lazily creates the cache and region objects:
>>>
>>> ```scala
>>> object CacheProvider {
>>> lazy val cache = new ClientCacheFactory()...
>>> lazy val region = ....
>>> }
>>> ```
>>>
>>> Then update your `foreachPartition` function to use this object
>>>
>>> ```scala
>>> records -> {
>>>   val cache = CacheProvider.cache
>>>   val region = CacheProvider.region
>>>   ...
>>> ```
>>>
>>> This way the cache is not created until it is needed on the worker.
>>> You'll want to delay closing the cache until your Spark application
>>> ends; there are Spark event listeners you can register to cleanup
>>> resources. I have not tested this code myself, so it might need some
>>> tweaking.
>>>
>>> --Bradford
>>>
>>>
>>> On Tue, Jul 17, 2018 at 2:10 PM trung kien <[email protected]> wrote:
>>> >
>>> > Hi John,
>>> >
>>> > The problem is that outside of rdd.foreachPartition block, the code is
>>> not executed by single worker.
>>> > In Spark streaming, each parition will be handle by a worker in
>>> parallel.
>>> > So multiple workers will work on its own processing(region,records) at
>>> the same time.
>>> >
>>> > If I open connection outside of that block, it's not passed through
>>> workers.
>>> > The recommend way is creating connection inside that block
>>> > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html
>>> (Design Patterns for using foreachRDD)
>>> >
>>> > dstream.foreachRDD { rdd =>
>>> >   rdd.foreachPartition { partitionOfRecords =>
>>> >     val connection = createNewConnection()
>>> >     partitionOfRecords.foreach(record => connection.send(record))
>>> >     connection.close()
>>> >   }
>>> > }
>>> >
>>> > What's I don't understand is we seems to create the connection
>>> locally, create then close in the same block.
>>> > Is there any chance that the connection will be re-used by other
>>> workers (on other machines)?
>>> > For example, I have 2 workers running in parallel all of them will
>>> create connection pool to locator at pretty the same time
>>> > Is the connection pool shared among all my workers? so the close on
>>> worker #1 will impact the connection on worker #2.
>>> >
>>> > On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote:
>>> >>
>>> >> Hi Trung-
>>> >>
>>> >> You definitely should not be opening and closing a connection each
>>> time you process a record; that is excessive.
>>> >>
>>> >> I can tell you right now that, between clientCache.close(), say, in
>>> the first iteration, and ClientCacheFactory.create() in the second
>>> iteration, it is unlikely that Geode completely "closed" the ClientCache
>>> instance (releasing all the resources) before the ClientCache instance is
>>> recreated in the subsequent iteration, hence the reason you hit a
>>> CacheClosedException or RegionExistsException.
>>> >>
>>> >> I have run into similar problems in a single test class, where I need
>>> to cycle the cache instance between test case methods in the same test
>>> class (suite).
>>> >>
>>> >> Having said that, it would be better to structure the code as...
>>> >>
>>> >> ClientCache cache = new ClientCacheFactory()
>>> >>     .addPoolLocator(locator, 10334)
>>> >>     .set("log-level", "WARN")
>>> >>     .create();
>>> >>
>>> >> Region< String, Transaction > region =
>>> >>     cache.<String, Transaction
>>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>> >>
>>> >> try {
>>> >>
>>> >>     results.foreachRDD(
>>> >>         rdd -> rdd.foreachPartition(
>>> >>             records -> {
>>> >>                 processing(region,records)
>>> >>             }
>>> >>         )
>>> >>     );
>>> >> }
>>> >> finally {
>>> >>     cache.close();
>>> >> }
>>> >>
>>> >> Why do you need to open/close the connection for each record/batch
>>> anyhow?
>>> >>
>>> >> Regards,
>>> >> -John
>>> >>
>>> >>
>>> >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]>
>>> wrote:
>>> >>>
>>> >>> Dear all,
>>> >>>
>>> >>> I am looking for a correct way to write output of a spark streaming
>>> job to a geode's region
>>> >>>
>>> >>> I'm trying following codes:
>>> >>>
>>> >>> results.foreachRDD(
>>> >>>                     rdd -> rdd.foreachPartition(
>>> >>>                             records ->
>>> >>>                             {
>>> >>>                                     //STEP 1
>>> >>>                                     ClientCache cache = new
>>> ClientCacheFactory().addPoolLocator(locator, 10334)
>>> >>>                                     .set("log-level",
>>> "WARN").create();
>>> >>>
>>> >>>                                     //STEP 2
>>> >>>                                     Region< String, Transaction >
>>> region = cache.<String, Transaction
>>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>> >>>
>>> >>>                                     //STEP 3
>>> >>>                                     processing(region,records)
>>> >>>
>>> >>>                                     //STEP 4
>>> >>>                                     cache.close();
>>> >>>                             }
>>> >>>                     )
>>> >>>                 );
>>> >>>
>>> >>> 1/ If I'm using above code, then I will get following errors:
>>> >>>                org.apache.geode.cache.CacheClosedException: The
>>> cache is closed.
>>> >>>                org.apache.geode.cache.RegionExistsException:
>>> /transactions
>>> >>>
>>> >>>         I think Spark is trying to re-use the connection to geode,
>>> in second batch when connection is already closed it raise exceptions
>>> >>>
>>> >>> 2/ I comment out the //STEP 4 (cache.close())
>>> >>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>>> >>>
>>> >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while
>>> loop
>>> >>>               while(true) {
>>> >>>                    //STEP 1,
>>> >>>                    //STEP 2
>>> >>>                    //STEP 3
>>> >>>                    //STEP 4
>>> >>>               }
>>> >>>
>>> >>>           The application is running well without exception or
>>> loosing data.
>>> >>>
>>> >>>
>>> >>> Any thoughts is really appriciated
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> Thanks
>>> >>> Kien
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> -John
>>> >> john.blum10101 (skype)
>>> >
>>> > --
>>> > Thanks
>>> > Kien
>>>
>> --
>> Thanks
>> Kien
>>
> --
Thanks
Kien

Reply via email to