Re: How to close connection in mapPartitions?

2015-10-23 Thread Sujit Pal
Hi Bin,

Very likely the RedisClientPool is being closed too quickly before map has
a chance to get to it. One way to verify would be to comment out the .close
line and see what happens. FWIW I saw a similar problem writing to Solr
where I put a commit where you have a close, and noticed that the commit
was happening before the actual data insertion (in the .map line) happened
(and no data showing up in the index until the next time I ran the code
:-)).

At the time I got around it by doing a zipWithIndex on the Iterator, then
doing a partial commit every n records, and finally doing a commit from the
driver code. However, this won't work for you, and there is a better way
outlined on this page (look for Tobias Pfeiffer, its the code block
immediately following):

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

where you test for hasNext on the iterator and call close if its the last
element, within the scope of the .map call.

-sujit

On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Are you sure RedisClientPool is being initialized properly in the
> constructor of RedisCache? Can you please copy paste the code that you use
> to initialize RedisClientPool inside the constructor of RedisCache?
>
> Thanks,
> Aniket
>
> On Fri, Oct 23, 2015 at 11:47 AM Bin Wang  wrote:
>
>> BTW, "lines" is a DStream.
>>
>> Bin Wang 于2015年10月23日周五 下午2:16写道:
>>
>>> I use mapPartitions to open connections to Redis, I write it like this:
>>>
>>> val seqs = lines.mapPartitions { lines =>
>>>   val cache = new RedisCache(redisUrl, redisPort)
>>>   val result = lines.map(line => Parser.parseBody(line, cache))
>>>   cache.redisPool.close
>>>   result
>>> }
>>>
>>> But it seems the pool is closed before I use it. Am I doing anything
>>> wrong? Here is the error:
>>>
>>> java.lang.IllegalStateException: Pool not open
>>> at 
>>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>>> at 
>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>>> at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>>> at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>>> at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>>> at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>> at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at 
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>> at 
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>


Re: How to close connection in mapPartitions?

2015-10-23 Thread Aniket Bhatnagar
Are you sure RedisClientPool is being initialized properly in the
constructor of RedisCache? Can you please copy paste the code that you use
to initialize RedisClientPool inside the constructor of RedisCache?

Thanks,
Aniket

On Fri, Oct 23, 2015 at 11:47 AM Bin Wang  wrote:

> BTW, "lines" is a DStream.
>
> Bin Wang 于2015年10月23日周五 下午2:16写道:
>
>> I use mapPartitions to open connections to Redis, I write it like this:
>>
>> val seqs = lines.mapPartitions { lines =>
>>   val cache = new RedisCache(redisUrl, redisPort)
>>   val result = lines.map(line => Parser.parseBody(line, cache))
>>   cache.redisPool.close
>>   result
>> }
>>
>> But it seems the pool is closed before I use it. Am I doing anything
>> wrong? Here is the error:
>>
>> java.lang.IllegalStateException: Pool not open
>>  at 
>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>>  at 
>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>>  at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>  at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>>  at 
>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>>  at 
>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at scala.collection.immutable.List.foreach(List.scala:318)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>  at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>>  at 
>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>  at 
>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at 
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>  at 
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>


Re: How to close connection in mapPartitions?

2015-10-23 Thread Bin Wang
BTW, "lines" is a DStream.

Bin Wang 于2015年10月23日周五 下午2:16写道:

> I use mapPartitions to open connections to Redis, I write it like this:
>
> val seqs = lines.mapPartitions { lines =>
>   val cache = new RedisCache(redisUrl, redisPort)
>   val result = lines.map(line => Parser.parseBody(line, cache))
>   cache.redisPool.close
>   result
> }
>
> But it seems the pool is closed before I use it. Am I doing anything
> wrong? Here is the error:
>
> java.lang.IllegalStateException: Pool not open
>   at 
> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>   at 
> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>   at com.redis.RedisClientPool.withClient(Pool.scala:34)
>   at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>   at 
> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>   at 
> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>   at 
> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>   at 
> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>