Dear all,

I am looking for a correct way to write output of a spark streaming job to
a geode's region

I'm trying following codes:

results.foreachRDD(
                    rdd -> rdd.foreachPartition(
                            records ->
                            {
                                    //STEP 1
                                    ClientCache cache = new
ClientCacheFactory().addPoolLocator(locator, 10334)
                                    .set("log-level", "WARN").create();

                                    //STEP 2
                                    Region< String, Transaction > region =
cache.<String, Transaction
>createClientRegionFactory(ClientRegionShortcut.PROXY).create("transactions");

                                    //STEP 3
                                    processing(region,records)

                                    //STEP 4
                                    cache.close();
                            }
                    )
                );

1/ If I'm using above code, then I will get following errors:
               org.apache.geode.cache.CacheClosedException: The cache is
closed.
               org.apache.geode.cache.RegionExistsException: /transactions

        I think Spark is trying to re-use the connection to geode, in
second batch when connection is already closed it raise exceptions

2/ I comment out the //STEP 4 (cache.close())
              The spark job running BUT THE DATA IS MISSING RANDOMLY

3/ I'm writing stanalone application to repeat step 1 -> 4 in while loop
              while(true) {
                   //STEP 1,
                   //STEP 2
                   //STEP 3
                   //STEP 4
              }

          The application is running well without exception or loosing data.


Any thoughts is really appriciated




-- 
Thanks
Kien

Reply via email to