I've modified my code:
void saveUsers(Collection<User> users){
if(users && !users.isEmpty()){
// Get Connection instance, instance created once. , BM is new
for each request.
def mutator =
getConnection().getBufferedMutator(getBufferedMutatorParams())
List<Put> putList = users.collect{toPut(it)}
mutator.mutate(putList)
*mutator.close() // exception here*
}
}
Exception is still thrown
2016-02-09 15:43 GMT+01:00 Stack <[email protected]>:
> On Tue, Feb 9, 2016 at 6:37 AM, Serega Sheypak <[email protected]>
> wrote:
>
> > Hi, thanks for reply!
> >
> >
> > > What should we add here to make the doc more clear on BufferedMutator?
> > >
> >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > It's pretty clear.
> >
> > > And in the example?
> > Example shows that both connection and buffered mutator are thread safe
> and
> > they are closed at once.
> > - Should keep single instance of connection and bufferedmutator per
> thread?
> >
>
> No. Connection says you should generally share the Connection instance.
> Ditto for BufferedMutator. It is backed by a buffer so you are batching
> your writes when you use it. If you share the BM, your batching will more
> 'regular'.
>
>
> > - Should I keep connection per thread and instantiate mutator for each
> > request?
> >
>
> See above.
>
>
> > - what happens if autoflush enabled?
> >
>
> When the backing buffer is full, it gets flushed to the cluster.
>
>
> > - is it possible to turn on sync mode for saving data?
> >
>
> Yes. Don't use a BufferedMutator. Your throughput will go down as you do an
> RPC per write.
>
>
> > - Connection could get into failed/invalid state for some reason (RS
> down,
> > RS up, some networking partitioned happend). Is it possible?
>
>
>
> It could happen but I think by now we've seen nearly all of the ways in
> which a Connection can fail and internally, it compensates.
>
>
>
> > If it's
> > possible, then what is the right way to handle it: "close" failed
> > connection and ask for new one?
> >
> >
> Good question. Connection internally will retry and 'ride over' near all
> cluster issues but the catastrophic.
>
> St.Ack
>
>
>
> > > I assume users is collection of User's.
> > > Have you tried obtaining / closing mutator for each User instead of
> > sharing
> > > the mutator ?
> >
> > > If another flush, say because there were lots of puts, then when close
> > > comes in, we are out of threads and you'd get below.
> > Looks like it's the root cause, let me try!
> >
> > Thank you for detailed explanation!
> >
> >
> >
> > 2016-02-09 7:10 GMT+01:00 Stack <[email protected]>:
> >
> > > On Mon, Feb 8, 2016 at 3:01 PM, Serega Sheypak <
> [email protected]
> > >
> > > wrote:
> > >
> > > > Hi, I'm confused with new HBase 1.0 API. API says that application
> > should
> > > > manage connections (Previously HConnections) on their own. Nothing is
> > > > managed itnernally now.
> > > >
> > > >
> > > That is right.
> > >
> > >
> > >
> > >
> > > > Here is an example:
> > > >
> > > >
> > >
> >
> https://hbase.apache.org/xref/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.html
> > > >
> > > > It gives no clue about lifecycle :(
> > > >
> > >
> > >
> > > Connection is fairly explicit:
> > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html
> > >
> > > What should we add here to make the doc more clear on BufferedMutator?
> > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > >
> > > And in the example?
> > >
> > >
> > >
> > > > Right now I create single connection instance for servlet and
> > > > BufferedMutator per request.
> > > >
> > > >
> > > >
> > > The recommendation is a singled BufferedMutator shared across requests.
> > >
> > >
> > >
> > >
> > > > //getConnection returns single instance, it doesn't return new
> > connection
> > > > each time
> > > > def mutator =
> > > > getConnection().getBufferedMutator(getBufferedMutatorParams())
> > > >
> > >
> > >
> > > getConnection is your method?
> > >
> > >
> > > getBufferedMutator creates a new one?
> > >
> > >
> > >
> > > > users.each{ mutator.mutate(toPut(it))}
> > > > mutator.close() //exception is thrown here
> > > >
> > > >
> > > The close is flushing out all the writes.
> > >
> > > If a BufferedMutator per servlet instance, there are probably many when
> > > many requests coming in.
> > >
> > > See what happens when you create one:
> > >
> > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/ConnectionImplementation.html#313
> > >
> > > which calls through to here....
> > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/HTable.html#126
> > >
> > > ... which creates an executor of max 1 task only.
> > >
> > > If another flush, say because there were lots of puts, then when close
> > > comes in, we are out of threads and you'd get below.
> > >
> > > St.Ack
> > >
> > >
> > >
> > >
> > > >
> > > > And I get tons of exceptions thrown on "mutator.close()", what do I
> do
> > > > wrong?
> > > >
> > > > WARNING: #905, the task was rejected by the pool. This is unexpected.
> > > > Server is node04.server.com, 60020,1447338864601
> > > > java.util.concurrent.RejectedExecutionException: Task
> > > > java.util.concurrent.FutureTask@5cff3b40 rejected from
> > > > java.util.concurrent.ThreadPoolExecutor@686c2853[Terminated, pool
> > size =
> > > > 0,
> > > > active threads = 0, queued tasks = 0, completed tasks = 1]
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.sendMultiAction(AsyncProcess.java:956)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.access$000(AsyncProcess.java:574)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess.submitMultiActions(AsyncProcess.java:423)
> > > > at
> > > >
> > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:403)
> > > > at
> > > >
> > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158)
> > > > at
> org.apache.hadoop.hbase.client.BufferedMutator$close$0.call(Unknown
> > > > Source)
> > > >
> > >
> >
>