It helped, thanks.
Now I have single reusable BufferedMutator instance and I don't call
.close() after mutations, I call .flush() Is it ok?

2016-02-09 23:09 GMT+01:00 Stack <st...@duboce.net>:

> On Tue, Feb 9, 2016 at 9:46 AM, Serega Sheypak <serega.shey...@gmail.com>
> wrote:
>
> > I've modified my code:
> >
> > void saveUsers(Collection<User> users){
> >         if(users && !users.isEmpty()){
> >
> >             // Get Connection instance, instance created once. , BM is
> new
> > for each request.'
> >
>
>
> If new for each request, don't bother using BM. Otherwise, put the BM in
> same place you keep your Connection and just be sure to call close on BM
> when your app goes down.
> St.Ack
>
>
>
>
>
>
> >             def mutator =
> > getConnection().getBufferedMutator(getBufferedMutatorParams())
> >
> >              List<Put> putList = users.collect{toPut(it)}
> >             mutator.mutate(putList)
> >             *mutator.close() // exception here*
> >         }
> >     }
> >
> > Exception is still thrown
> >
> > 2016-02-09 15:43 GMT+01:00 Stack <st...@duboce.net>:
> >
> > > On Tue, Feb 9, 2016 at 6:37 AM, Serega Sheypak <
> serega.shey...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi, thanks for reply!
> > > >
> > > >
> > > > > What should we add here to make the doc more clear on
> > BufferedMutator?
> > > > >
> > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > > > It's pretty clear.
> > > >
> > > > > And in the example?
> > > > Example shows that both connection and buffered mutator are thread
> safe
> > > and
> > > > they are closed at once.
> > > > - Should keep single instance of connection and bufferedmutator per
> > > thread?
> > > >
> > >
> > > No. Connection says you should generally share the Connection instance.
> > > Ditto for BufferedMutator. It is backed by a buffer so you are batching
> > > your writes when you use it. If you share the BM, your batching will
> more
> > > 'regular'.
> > >
> > >
> > > > - Should I keep connection per thread and instantiate mutator for
> each
> > > > request?
> > > >
> > >
> > > See above.
> > >
> > >
> > > > - what happens if autoflush enabled?
> > > >
> > >
> > > When the backing buffer is full, it gets flushed to the cluster.
> > >
> > >
> > > > - is it possible to turn on sync mode for saving data?
> > > >
> > >
> > > Yes. Don't use a BufferedMutator. Your throughput will go down as you
> do
> > an
> > > RPC per write.
> > >
> > >
> > > > - Connection could get into failed/invalid state for some reason (RS
> > > down,
> > > > RS up, some networking partitioned happend). Is it possible?
> > >
> > >
> > >
> > > It could happen but I think by now we've seen nearly all of the ways in
> > > which a Connection can fail and internally, it compensates.
> > >
> > >
> > >
> > > > If it's
> > > > possible, then what is the right way to handle it: "close" failed
> > > > connection and ask for new one?
> > > >
> > > >
> > > Good question. Connection internally will retry and 'ride over' near
> all
> > > cluster issues but the catastrophic.
> > >
> > > St.Ack
> > >
> > >
> > >
> > > > > I assume users is collection of User's.
> > > > > Have you tried obtaining / closing mutator for each User instead of
> > > > sharing
> > > > > the mutator ?
> > > >
> > > > > If another flush, say because there were lots of puts, then when
> > close
> > > > > comes in, we are out of threads and you'd get below.
> > > > Looks like it's the root cause, let me try!
> > > >
> > > > Thank you for detailed explanation!
> > > >
> > > >
> > > >
> > > > 2016-02-09 7:10 GMT+01:00 Stack <st...@duboce.net>:
> > > >
> > > > > On Mon, Feb 8, 2016 at 3:01 PM, Serega Sheypak <
> > > serega.shey...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi, I'm confused with new HBase 1.0 API. API says that
> application
> > > > should
> > > > > > manage connections (Previously HConnections) on their own.
> Nothing
> > is
> > > > > > managed itnernally now.
> > > > > >
> > > > > >
> > > > > That is right.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > > Here is an example:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://hbase.apache.org/xref/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.html
> > > > > >
> > > > > > It gives no clue about lifecycle :(
> > > > > >
> > > > >
> > > > >
> > > > > Connection is fairly explicit:
> > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html
> > > > >
> > > > > What should we add here to make the doc more clear on
> > BufferedMutator?
> > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > > > >
> > > > > And in the example?
> > > > >
> > > > >
> > > > >
> > > > > > Right now I create single connection instance for servlet and
> > > > > > BufferedMutator per request.
> > > > > >
> > > > > >
> > > > > >
> > > > > The recommendation is a singled BufferedMutator shared across
> > requests.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > > //getConnection returns single instance, it doesn't return new
> > > > connection
> > > > > > each time
> > > > > > def mutator =
> > > > > > getConnection().getBufferedMutator(getBufferedMutatorParams())
> > > > > >
> > > > >
> > > > >
> > > > > getConnection is your method?
> > > > >
> > > > >
> > > > > getBufferedMutator creates a new one?
> > > > >
> > > > >
> > > > >
> > > > > > users.each{ mutator.mutate(toPut(it))}
> > > > > > mutator.close() //exception is thrown here
> > > > > >
> > > > > >
> > > > > The close is flushing out all the writes.
> > > > >
> > > > > If a BufferedMutator per servlet instance, there are probably many
> > when
> > > > > many requests coming in.
> > > > >
> > > > > See what happens when you create one:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/ConnectionImplementation.html#313
> > > > >
> > > > > which calls through to here....
> > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/HTable.html#126
> > > > >
> > > > > ... which creates an executor of max 1 task only.
> > > > >
> > > > > If another flush, say because there were lots of puts, then when
> > close
> > > > > comes in, we are out of threads and you'd get below.
> > > > >
> > > > > St.Ack
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > >
> > > > > > And I get tons of exceptions thrown on "mutator.close()", what
> do I
> > > do
> > > > > > wrong?
> > > > > >
> > > > > > WARNING: #905, the task was rejected by the pool. This is
> > unexpected.
> > > > > > Server is node04.server.com, 60020,1447338864601
> > > > > > java.util.concurrent.RejectedExecutionException: Task
> > > > > > java.util.concurrent.FutureTask@5cff3b40 rejected from
> > > > > > java.util.concurrent.ThreadPoolExecutor@686c2853[Terminated,
> pool
> > > > size =
> > > > > > 0,
> > > > > > active threads = 0, queued tasks = 0, completed tasks = 1]
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.sendMultiAction(AsyncProcess.java:956)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.access$000(AsyncProcess.java:574)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess.submitMultiActions(AsyncProcess.java:423)
> > > > > > at
> > > > > >
> > > >
> > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:403)
> > > > > > at
> > > > > >
> > > >
> > org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158)
> > > > > > at
> > > org.apache.hadoop.hbase.client.BufferedMutator$close$0.call(Unknown
> > > > > > Source)
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to