Hi Trung,

One pattern that comes to mind is to create a `CacheProvider` object
that lazily creates the cache and region objects:

```scala
object CacheProvider {
lazy val cache = new ClientCacheFactory()...
lazy val region = ....
}
```

Then update your `foreachPartition` function to use this object

```scala
records -> {
  val cache = CacheProvider.cache
  val region = CacheProvider.region
  ...
```

This way the cache is not created until it is needed on the worker.
You'll want to delay closing the cache until your Spark application
ends; there are Spark event listeners you can register to cleanup
resources. I have not tested this code myself, so it might need some
tweaking.

--Bradford


On Tue, Jul 17, 2018 at 2:10 PM trung kien <[email protected]> wrote:
>
> Hi John,
>
> The problem is that outside of rdd.foreachPartition block, the code is not 
> executed by single worker.
> In Spark streaming, each parition will be handle by a worker in parallel.
> So multiple workers will work on its own processing(region,records) at the 
> same time.
>
> If I open connection outside of that block, it's not passed through workers.
> The recommend way is creating connection inside that block
> http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html (Design 
> Patterns for using foreachRDD)
>
> dstream.foreachRDD { rdd =>
>   rdd.foreachPartition { partitionOfRecords =>
>     val connection = createNewConnection()
>     partitionOfRecords.foreach(record => connection.send(record))
>     connection.close()
>   }
> }
>
> What's I don't understand is we seems to create the connection locally, 
> create then close in the same block.
> Is there any chance that the connection will be re-used by other workers (on 
> other machines)?
> For example, I have 2 workers running in parallel all of them will create 
> connection pool to locator at pretty the same time
> Is the connection pool shared among all my workers? so the close on worker #1 
> will impact the connection on worker #2.
>
> On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote:
>>
>> Hi Trung-
>>
>> You definitely should not be opening and closing a connection each time you 
>> process a record; that is excessive.
>>
>> I can tell you right now that, between clientCache.close(), say, in the 
>> first iteration, and ClientCacheFactory.create() in the second iteration, it 
>> is unlikely that Geode completely "closed" the ClientCache instance 
>> (releasing all the resources) before the ClientCache instance is recreated 
>> in the subsequent iteration, hence the reason you hit a CacheClosedException 
>> or RegionExistsException.
>>
>> I have run into similar problems in a single test class, where I need to 
>> cycle the cache instance between test case methods in the same test class 
>> (suite).
>>
>> Having said that, it would be better to structure the code as...
>>
>> ClientCache cache = new ClientCacheFactory()
>>     .addPoolLocator(locator, 10334)
>>     .set("log-level", "WARN")
>>     .create();
>>
>> Region< String, Transaction > region =
>>     cache.<String, Transaction 
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>
>> try {
>>
>>     results.foreachRDD(
>>         rdd -> rdd.foreachPartition(
>>             records -> {
>>                 processing(region,records)
>>             }
>>         )
>>     );
>> }
>> finally {
>>     cache.close();
>> }
>>
>> Why do you need to open/close the connection for each record/batch anyhow?
>>
>> Regards,
>> -John
>>
>>
>> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]> wrote:
>>>
>>> Dear all,
>>>
>>> I am looking for a correct way to write output of a spark streaming job to 
>>> a geode's region
>>>
>>> I'm trying following codes:
>>>
>>> results.foreachRDD(
>>>                     rdd -> rdd.foreachPartition(
>>>                             records ->
>>>                             {
>>>                                     //STEP 1
>>>                                     ClientCache cache = new 
>>> ClientCacheFactory().addPoolLocator(locator, 10334)
>>>                                     .set("log-level", "WARN").create();
>>>
>>>                                     //STEP 2
>>>                                     Region< String, Transaction > region = 
>>> cache.<String, Transaction 
>>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>>
>>>                                     //STEP 3
>>>                                     processing(region,records)
>>>
>>>                                     //STEP 4
>>>                                     cache.close();
>>>                             }
>>>                     )
>>>                 );
>>>
>>> 1/ If I'm using above code, then I will get following errors:
>>>                org.apache.geode.cache.CacheClosedException: The cache is 
>>> closed.
>>>                org.apache.geode.cache.RegionExistsException: /transactions
>>>
>>>         I think Spark is trying to re-use the connection to geode, in 
>>> second batch when connection is already closed it raise exceptions
>>>
>>> 2/ I comment out the //STEP 4 (cache.close())
>>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>>>
>>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
>>>               while(true) {
>>>                    //STEP 1,
>>>                    //STEP 2
>>>                    //STEP 3
>>>                    //STEP 4
>>>               }
>>>
>>>           The application is running well without exception or loosing data.
>>>
>>>
>>> Any thoughts is really appriciated
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Kien
>>
>>
>>
>>
>> --
>> -John
>> john.blum10101 (skype)
>
> --
> Thanks
> Kien

Reply via email to