Hi John,

The problem is that outside of rdd.foreachPartition block, the code is not
executed by single worker.
In Spark streaming, each parition will be handle by a worker in parallel.
So multiple workers will work on its own processing(region,records) at the
same time.

If I open connection outside of that block, it's not passed through workers.
The recommend way is creating connection inside that block
http://spark.apache.org/docs/1.2.2/streaming-programming-guide.html (Design
Patterns for using foreachRDD)

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

What's I don't understand is we seems to create the connection locally,
create then close in the same block.
Is there any chance that the connection will be re-used by other workers
(on other machines)?
For example, I have 2 workers running in parallel all of them will create
connection pool to locator at pretty the same time
Is the connection pool shared among all my workers? so the close on worker
#1 will impact the connection on worker #2.

On Tue, Jul 17, 2018 at 11:55 AM John Blum <[email protected]> wrote:

> Hi Trung-
>
> You definitely should not be opening and closing a connection each time
> you process a record; that is excessive.
>
> I can tell you right now that, between clientCache.close(), say, in the
> first iteration, and ClientCacheFactory.create() in the second iteration,
> it is unlikely that Geode completely "closed" the ClientCache instance
> (releasing all the resources) before the ClientCache instance is
> recreated in the subsequent iteration, hence the reason you hit a
> CacheClosedException or RegionExistsException.
>
> I have run into similar problems in a single test class, where I need to
> cycle the cache instance between test case methods in the same test class
> (suite).
>
> Having said that, it would be better to structure the code as...
>
> ClientCache cache = new ClientCacheFactory()
>     .addPoolLocator(locator, 10334)
>     .set("log-level", "WARN")
>     .create();
>
> Region< String, Transaction > region =
>     cache.<String, Transaction
> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>
> try {
>
>     results.foreachRDD(
>         rdd -> rdd.foreachPartition(
>             records -> {
>                 processing(region,records)
>             }
>         )
>     );
> }
> finally {
>     cache.close();
> }
>
> Why do you need to open/close the connection for each record/batch anyhow?
>
> Regards,
> -John
>
>
> On Mon, Jul 16, 2018 at 11:15 PM, trung kien <[email protected]> wrote:
>
>> Dear all,
>>
>> I am looking for a correct way to write output of a spark streaming job
>> to a geode's region
>>
>> I'm trying following codes:
>>
>> results.foreachRDD(
>>                     rdd -> rdd.foreachPartition(
>>                             records ->
>>                             {
>>                                     //STEP 1
>>                                     ClientCache cache = new
>> ClientCacheFactory().addPoolLocator(locator, 10334)
>>                                     .set("log-level", "WARN").create();
>>
>>                                     //STEP 2
>>                                     Region< String, Transaction > region
>> = cache.<String, Transaction
>> >createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");
>>
>>                                     //STEP 3
>>                                     processing(region,records)
>>
>>                                     //STEP 4
>>                                     cache.close();
>>                             }
>>                     )
>>                 );
>>
>> 1/ If I'm using above code, then I will get following errors:
>>                org.apache.geode.cache.CacheClosedException: The cache is
>> closed.
>>                org.apache.geode.cache.RegionExistsException: /
>> transactions
>>
>>         I think Spark is trying to re-use the connection to geode, in
>> second batch when connection is already closed it raise exceptions
>>
>> 2/ I comment out the //STEP 4 (cache.close())
>>               The spark job running BUT THE DATA IS MISSING RANDOMLY
>>
>> 3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
>>               while(true) {
>>                    //STEP 1,
>>                    //STEP 2
>>                    //STEP 3
>>                    //STEP 4
>>               }
>>
>>           The application is running well without exception or loosing
>> data.
>>
>>
>> Any thoughts is really appriciated
>>
>>
>>
>>
>> --
>> Thanks
>> Kien
>>
>
>
>
> --
> -John
> john.blum10101 (skype)
>
-- 
Thanks
Kien

Reply via email to