Re: How to close connection in mapPartitions?
Hi Bin, Very likely the RedisClientPool is being closed too quickly before map has a chance to get to it. One way to verify would be to comment out the .close line and see what happens. FWIW I saw a similar problem writing to Solr where I put a commit where you have a close, and noticed that the commit was happening before the actual data insertion (in the .map line) happened (and no data showing up in the index until the next time I ran the code :-)). At the time I got around it by doing a zipWithIndex on the Iterator, then doing a partial commit every n records, and finally doing a commit from the driver code. However, this won't work for you, and there is a better way outlined on this page (look for Tobias Pfeiffer, its the code block immediately following): http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ where you test for hasNext on the iterator and call close if its the last element, within the scope of the .map call. -sujit On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar < aniket.bhatna...@gmail.com> wrote: > Are you sure RedisClientPool is being initialized properly in the > constructor of RedisCache? Can you please copy paste the code that you use > to initialize RedisClientPool inside the constructor of RedisCache? > > Thanks, > Aniket > > On Fri, Oct 23, 2015 at 11:47 AM Bin Wangwrote: > >> BTW, "lines" is a DStream. >> >> Bin Wang 于2015年10月23日周五 下午2:16写道: >> >>> I use mapPartitions to open connections to Redis, I write it like this: >>> >>> val seqs = lines.mapPartitions { lines => >>> val cache = new RedisCache(redisUrl, redisPort) >>> val result = lines.map(line => Parser.parseBody(line, cache)) >>> cache.redisPool.close >>> result >>> } >>> >>> But it seems the pool is closed before I use it. Am I doing anything >>> wrong? Here is the error: >>> >>> java.lang.IllegalStateException: Pool not open >>> at >>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) >>> at >>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) >>> at com.redis.RedisClientPool.withClient(Pool.scala:34) >>> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) >>> at >>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) >>> at >>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >>> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) >>> at >>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >>> at >>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>>
Re: How to close connection in mapPartitions?
Are you sure RedisClientPool is being initialized properly in the constructor of RedisCache? Can you please copy paste the code that you use to initialize RedisClientPool inside the constructor of RedisCache? Thanks, Aniket On Fri, Oct 23, 2015 at 11:47 AM Bin Wangwrote: > BTW, "lines" is a DStream. > > Bin Wang 于2015年10月23日周五 下午2:16写道: > >> I use mapPartitions to open connections to Redis, I write it like this: >> >> val seqs = lines.mapPartitions { lines => >> val cache = new RedisCache(redisUrl, redisPort) >> val result = lines.map(line => Parser.parseBody(line, cache)) >> cache.redisPool.close >> result >> } >> >> But it seems the pool is closed before I use it. Am I doing anything >> wrong? Here is the error: >> >> java.lang.IllegalStateException: Pool not open >> at >> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) >> at >> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) >> at com.redis.RedisClientPool.withClient(Pool.scala:34) >> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) >> at >> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) >> at >> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) >> at >> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >> at >> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >>
Re: How to close connection in mapPartitions?
BTW, "lines" is a DStream. Bin Wang于2015年10月23日周五 下午2:16写道: > I use mapPartitions to open connections to Redis, I write it like this: > > val seqs = lines.mapPartitions { lines => > val cache = new RedisCache(redisUrl, redisPort) > val result = lines.map(line => Parser.parseBody(line, cache)) > cache.redisPool.close > result > } > > But it seems the pool is closed before I use it. Am I doing anything > wrong? Here is the error: > > java.lang.IllegalStateException: Pool not open > at > org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) > at > org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) > at com.redis.RedisClientPool.withClient(Pool.scala:34) > at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) > at > com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) > at > com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) > at > com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) > at > com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > >