Hi Bin, Very likely the RedisClientPool is being closed too quickly before map has a chance to get to it. One way to verify would be to comment out the .close line and see what happens. FWIW I saw a similar problem writing to Solr where I put a commit where you have a close, and noticed that the commit was happening before the actual data insertion (in the .map line) happened (and no data showing up in the index until the next time I ran the code :-)).
At the time I got around it by doing a zipWithIndex on the Iterator, then doing a partial commit every n records, and finally doing a commit from the driver code. However, this won't work for you, and there is a better way outlined on this page (look for Tobias Pfeiffer, its the code block immediately following): http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ where you test for hasNext on the iterator and call close if its the last element, within the scope of the .map call. -sujit On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar < aniket.bhatna...@gmail.com> wrote: > Are you sure RedisClientPool is being initialized properly in the > constructor of RedisCache? Can you please copy paste the code that you use > to initialize RedisClientPool inside the constructor of RedisCache? > > Thanks, > Aniket > > On Fri, Oct 23, 2015 at 11:47 AM Bin Wang <wbi...@gmail.com> wrote: > >> BTW, "lines" is a DStream. >> >> Bin Wang <wbi...@gmail.com>于2015年10月23日周五 下午2:16写道: >> >>> I use mapPartitions to open connections to Redis, I write it like this: >>> >>> val seqs = lines.mapPartitions { lines => >>> val cache = new RedisCache(redisUrl, redisPort) >>> val result = lines.map(line => Parser.parseBody(line, cache)) >>> cache.redisPool.close >>> result >>> } >>> >>> But it seems the pool is closed before I use it. Am I doing anything >>> wrong? Here is the error: >>> >>> java.lang.IllegalStateException: Pool not open >>> at >>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) >>> at >>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) >>> at com.redis.RedisClientPool.withClient(Pool.scala:34) >>> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) >>> at >>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) >>> at >>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >>> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) >>> at >>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >>> at >>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) >>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> >>>