Not sure if this helps any, there used to be a Geode Spark Connector module
that was removed with ticket GEODE-194.  I believe someone has
adopted/‘forked’ the code and maintaining it here:
https://github.com/Pivotal-Field-Engineering/geode-spark-connector


I think the connection with workers issue was addressed in the
connector.  Maybe
looking at that code might help with what you are doing? (or even using the
connector as is, although I am not sure what Spark version it is compatible
with)

On Tue, Jul 17, 2018 at 2:47 PM trung kien <[email protected]> wrote:

> Ah, that make sense.
> i will give it a try and let you know if it’s work.
>
> But my question on Client Cache Pool still stand.
> Wherether the pool will be shared among workers.
> Can worker #1 can close the cache of worker #2.
>
> On Tue, Jul 17, 2018 at 4:24 PM Bradford Boyle <[email protected]> wrote:
>
>> Hi Trung,
>>
>> One pattern that comes to mind is to create a `CacheProvider` object
>> that lazily creates the cache and region objects:
>>
>> ```scala
>> object CacheProvider {
>> lazy val cache = new ClientCacheFactory()...
>> lazy val region = ....
>> }
>> ```
>>
>> Then update your `foreachPartition` function to use this object
>>
>> ```scala
>> records -> {
>>   val cache = CacheProvider.cache
>>   val region = CacheProvider.region
>>   ...
>> ```
>>
>> This way the cache is not created until it is needed on the worker.
>> You'll want to delay closing the cache until your Spark application
>> ends; there are Spark event listeners you can register to cleanup
>> resources. I have not tested this code myself, so it might need some
>> tweaking.
>>
>> --Bradford
>>
>>
>> On Tue, Jul 17, 2018 at 2:10 PM trung kien <[email protected]> wrote:
>> >
>> > Hi John,
>> >
>> > The problem is that outside of rdd.foreachPartition block, the code is
>> not executed by single worker.
>> > In Spark streaming, each parition will be handle by a worker in
>> parallel.
>> > So multiple workers will work on its own processing(region,records) at
>> the same time.
>> >
>> > If I open connection outside of that block, it's not passed through
>> workers.
>> > The recommend way is creating connection inside that block
>> > http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html
>> (Design Patterns for using foreachRDD)
>> >
>> > dstream.foreachRDD { rdd =>
>> >   rdd.foreachPartition { partitionOfRecords =>
>> >     val connection = createNewConnection()
>> >     partitionOfRecords.foreach(record => connection.send(record))
>> >     connection.close()
>> >   }
>> > }
>> >
>> > What's I don't understand is we seems to create the connection locally,
>> create then close in the same block.
>> > Is there any chance that the connection will be re-used by other
>> workers (on other machines)?
>> > For example, I have 2 workers running in parallel all of them will
>> create connection pool to locator at pretty the same time
>> > Is the connection pool shared among all my workers? so the close on
>> worker #1 will impact the connection on worker #2.
>> >
>> > On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote:
>> >>
>> >> Hi Trung-
>> >>
>> >> You definitely should not be opening and closing a connection each
>> time you process a record; that is excessive.
>> >>
>> >> I can tell you right now that, between clientCache.close(), say, in
>> the first iteration, and ClientCacheFactory.create() in the second
>> iteration, it is unlikely that Geode completely "closed" the ClientCache
>> instance (releasing all the resources) before the ClientCache instance is
>> recreated in the subsequent iteration, hence the reason you hit a
>> CacheClosedException or RegionExistsException.
>> >>
>> >> I have run into similar problems in a single test class, where I need
>> to cycle the cache instance between test case methods in the same test
>> class (suite).
>> >>
>> >> Having said that, it would be better to structure the code as...
>> >>
>> >> ClientCache cache = new ClientCacheFactory()
>> >>     .addPoolLocator(locator, 10334)
>> >>     .set("log-level", "WARN")
>> >>     .create();
>> >>
>> >> Region< String, Transaction > region =
>> >>     cache.<String, Transaction
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>> >>
>> >> try {
>> >>
>> >>     results.foreachRDD(
>> >>         rdd -> rdd.foreachPartition(
>> >>             records -> {
>> >>                 processing(region,records)
>> >>             }
>> >>         )
>> >>     );
>> >> }
>> >> finally {
>> >>     cache.close();
>> >> }
>> >>
>> >> Why do you need to open/close the connection for each record/batch
>> anyhow?
>> >>
>> >> Regards,
>> >> -John
>> >>
>> >>
>> >> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]>
>> wrote:
>> >>>
>> >>> Dear all,
>> >>>
>> >>> I am looking for a correct way to write output of a spark streaming
>> job to a geode's region
>> >>>
>> >>> I'm trying following codes:
>> >>>
>> >>> results.foreachRDD(
>> >>>                     rdd -> rdd.foreachPartition(
>> >>>                             records ->
>> >>>                             {
>> >>>                                     //STEP 1
>> >>>                                     ClientCache cache = new
>> ClientCacheFactory().addPoolLocator(locator, 10334)
>> >>>                                     .set("log-level",
>> "WARN").create();
>> >>>
>> >>>                                     //STEP 2
>> >>>                                     Region< String, Transaction >
>> region = cache.<String, Transaction
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>> >>>
>> >>>                                     //STEP 3
>> >>>                                     processing(region,records)
>> >>>
>> >>>                                     //STEP 4
>> >>>                                     cache.close();
>> >>>                             }
>> >>>                     )
>> >>>                 );
>> >>>
>> >>> 1/ If I'm using above code, then I will get following errors:
>> >>>                org.apache.geode.cache.CacheClosedException: The cache
>> is closed.
>> >>>                org.apache.geode.cache.RegionExistsException:
>> /transactions
>> >>>
>> >>>         I think Spark is trying to re-use the connection to geode, in
>> second batch when connection is already closed it raise exceptions
>> >>>
>> >>> 2/ I comment out the //STEP 4 (cache.close())
>> >>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>> >>>
>> >>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while
>> loop
>> >>>               while(true) {
>> >>>                    //STEP 1,
>> >>>                    //STEP 2
>> >>>                    //STEP 3
>> >>>                    //STEP 4
>> >>>               }
>> >>>
>> >>>           The application is running well without exception or
>> loosing data.
>> >>>
>> >>>
>> >>> Any thoughts is really appriciated
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Thanks
>> >>> Kien
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> -John
>> >> john.blum10101 (skype)
>> >
>> > --
>> > Thanks
>> > Kien
>>
> --
> Thanks
> Kien
>

Reply via email to