Hi Trung,
One pattern that comes to mind is to create a `CacheProvider` object
that lazily creates the cache and region objects:
```scala
object CacheProvider {
lazy val cache = new ClientCacheFactory()...
lazy val region = ....
}
```
Then update your `foreachPartition` function to use this object
```scala
records -> {
val cache = CacheProvider.cache
val region = CacheProvider.region
...
```
This way the cache is not created until it is needed on the worker.
You'll want to delay closing the cache until your Spark application
ends; there are Spark event listeners you can register to cleanup
resources. I have not tested this code myself, so it might need some
tweaking.
--Bradford
On Tue, Jul 17, 2018 at 2:10 PM trung kien <[email protected]> wrote:
>
> Hi John,
>
> The problem is that outside of rdd.foreachPartition block, the code is not
> executed by single worker.
> In Spark streaming, each parition will be handle by a worker in parallel.
> So multiple workers will work on its own processing(region,records) at the
> same time.
>
> If I open connection outside of that block, it's not passed through workers.
> The recommend way is creating connection inside that block
> http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html (Design
> Patterns for using foreachRDD)
>
> dstream.foreachRDD { rdd =>
> rdd.foreachPartition { partitionOfRecords =>
> val connection = createNewConnection()
> partitionOfRecords.foreach(record => connection.send(record))
> connection.close()
> }
> }
>
> What's I don't understand is we seems to create the connection locally,
> create then close in the same block.
> Is there any chance that the connection will be re-used by other workers (on
> other machines)?
> For example, I have 2 workers running in parallel all of them will create
> connection pool to locator at pretty the same time
> Is the connection pool shared among all my workers? so the close on worker #1
> will impact the connection on worker #2.
>
> On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote:
>>
>> Hi Trung-
>>
>> You definitely should not be opening and closing a connection each time you
>> process a record; that is excessive.
>>
>> I can tell you right now that, between clientCache.close(), say, in the
>> first iteration, and ClientCacheFactory.create() in the second iteration, it
>> is unlikely that Geode completely "closed" the ClientCache instance
>> (releasing all the resources) before the ClientCache instance is recreated
>> in the subsequent iteration, hence the reason you hit a CacheClosedException
>> or RegionExistsException.
>>
>> I have run into similar problems in a single test class, where I need to
>> cycle the cache instance between test case methods in the same test class
>> (suite).
>>
>> Having said that, it would be better to structure the code as...
>>
>> ClientCache cache = new ClientCacheFactory()
>> .addPoolLocator(locator, 10334)
>> .set("log-level", "WARN")
>> .create();
>>
>> Region< String, Transaction > region =
>> cache.<String, Transaction
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>
>> try {
>>
>> results.foreachRDD(
>> rdd -> rdd.foreachPartition(
>> records -> {
>> processing(region,records)
>> }
>> )
>> );
>> }
>> finally {
>> cache.close();
>> }
>>
>> Why do you need to open/close the connection for each record/batch anyhow?
>>
>> Regards,
>> -John
>>
>>
>> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]> wrote:
>>>
>>> Dear all,
>>>
>>> I am looking for a correct way to write output of a spark streaming job to
>>> a geode's region
>>>
>>> I'm trying following codes:
>>>
>>> results.foreachRDD(
>>> rdd -> rdd.foreachPartition(
>>> records ->
>>> {
>>> //STEP 1
>>> ClientCache cache = new
>>> ClientCacheFactory().addPoolLocator(locator, 10334)
>>> .set("log-level", "WARN").create();
>>>
>>> //STEP 2
>>> Region< String, Transaction > region =
>>> cache.<String, Transaction
>>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>>
>>> //STEP 3
>>> processing(region,records)
>>>
>>> //STEP 4
>>> cache.close();
>>> }
>>> )
>>> );
>>>
>>> 1/ If I'm using above code, then I will get following errors:
>>> org.apache.geode.cache.CacheClosedException: The cache is
>>> closed.
>>> org.apache.geode.cache.RegionExistsException: /transactions
>>>
>>> I think Spark is trying to re-use the connection to geode, in
>>> second batch when connection is already closed it raise exceptions
>>>
>>> 2/ I comment out the //STEP 4 (cache.close())
>>> The spark job running BUT THE DATA IS MISSING RANDOMLY
>>>
>>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
>>> while(true) {
>>> //STEP 1,
>>> //STEP 2
>>> //STEP 3
>>> //STEP 4
>>> }
>>>
>>> The application is running well without exception or loosing data.
>>>
>>>
>>> Any thoughts is really appriciated
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Kien
>>
>>
>>
>>
>> --
>> -John
>> john.blum10101 (skype)
>
> --
> Thanks
> Kien