Hi Panka, Sorry, I'm not a thrift user, but looking at your issue, I will recommand you to test the same think with the last version of all those applications.
I mean, can you retry with Hadoop > 1.x and HBase 0.94.3? JM 2012/11/21, Pankaj Misra <[email protected]>: > Dear All, > > Requesting and looking forward for community's help on the issue, as > indicated in the thread below. Thanks. > > Regards > Pankaj Misra > > -----Original Message----- > From: Pankaj Misra > Sent: Tuesday, November 20, 2012 12:36 PM > To: [email protected] > Subject: RE: HBase NonBlocking and Async Thrift > > Dear All, > > Requesting for help on the async thrift protocol for non-blocking streaming > mode, would greatly appreciate any input on the issue outlined in the thread > below. Putting a wait/delay defeats the purpose of using the async > capability. Please help. > > Thanks & Regards > Pankaj Misra > > -----Original Message----- > From: Pankaj Misra > Sent: Monday, November 19, 2012 5:47 PM > To: [email protected] > Subject: HBase NonBlocking and Async Thrift > > Dear All, > > I am currently using Hadoop 0.23.1 with HBase 0.94.1 in a pseudo-distributed > mode. I am trying to use HBase Thrift API (not using Thrift2 yet) in a > nonblocking and async mode to insert a bulk of records. I am sharing the set > of steps for everyone's information and setting the context to my problem > > Please find below the code that I am using for initializing the async > client > > TBinaryProtocol.Factory binProtoFactory=new TBinaryProtocol.Factory(); > > TAsyncClientManager clientManager=null; > TNonblockingSocket nonBlockingSocket=null; > > try { > clientManager=new TAsyncClientManager(); } catch (IOException e) { > throw new RuntimeException(e); > } > try { > nonBlockingSocket=new TNonblockingSocket(HOST_NAME,PORT_NUMBER); > } catch (IOException e) { > throw new RuntimeException(e); > } > > > And, I am initializing the client as shown below > > Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, > clientManager, nonBlockingSocket); > > > > I could see two ways of using the client, i.e. one client for all the > records to be inserted or separate instance of client for every record. I > thought since this is a non-blocking channel, it would make sense to > initialize 1 client for all the requests, since all the requests would be > streamed using a framed transport. > > > // 1 async client for all the requests > Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, > clientManager, nonBlockingSocket); > > // mutate rows called in a loop to insert multiple records, using the same > client client.mutateRow(table, ByteBuffer.wrap(key), > mutations,mutationAttributes,new HBaseInsertAsyncHandler()); > > > But soon I found that I was wrong as I got back the following error. > > java.lang.IllegalStateException: Client is currently executing another > method: > org.apache.hadoop.hbase.thrift.generated.Hbase$AsyncClient$mutateRow_call > at > org.apache.thrift.async.TAsyncClient.checkReady(TAsyncClient.java:78) > at > org.apache.hadoop.hbase.thrift.generated.Hbase$AsyncClient.mutateRow(Hbase.java:2714) > Reading through the following JIRA educated me a bit more on this > > https://issues.apache.org/jira/browse/THRIFT-945 > > So, I changed my code to initialize the client per record to be inserted. > > > //called both these statements for every record to be inserted in a loop > Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, > clientManager, nonBlockingSocket); client.mutateRow(table, > ByteBuffer.wrap(key), mutations,mutationAttributes,new > HBaseInsertAsyncHandler()); > > Even this failed with the following error > 2012-11-19 17:25:15,275 WARN [TAsyncClientManager#SelectorThread 9] > async.TAsyncClientManager > (TAsyncClientManager.java:startPendingMethods(177)) - Caught exception in > TAsyncClientManager! > java.nio.channels.ClosedChannelException > at > java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:167) > at > java.nio.channels.SelectableChannel.register(SelectableChannel.java:254) > at > org.apache.thrift.transport.TNonblockingSocket.registerSelector(TNonblockingSocket.java:99) > at > org.apache.thrift.async.TAsyncMethodCall.start(TAsyncMethodCall.java:141) > at > org.apache.thrift.async.TAsyncClientManager$SelectThread.startPendingMethods(TAsyncClientManager.java:169) > at > org.apache.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114) > > So, it looked to me as if the channel registration could not happen in time > for it to get initialized and since the records are getting inserted in a > loop, it possibly needs a time window for initialization to get complete and > insert the record. So I had to introduce a delay for every such insertion, > which I do not prefer to do. The code changes for that are as shown below > > Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, > clientManager, nonBlockingSocket); client.mutateRow(table, > ByteBuffer.wrap(key), mutations,mutationAttributes,new > HBaseInsertAsyncHandler()); synchronized (client.getProtocolFactory()) { > client.getProtocolFactory().wait(20); > } > With the above change, I could see the records getting inserted into HBase > using async thrift client, but I think this is not the right solution and > will look for some guidance from the community to have a more consistent way > to utilize the the async thrift capability without any specific wait or > sleep times, as putting a wait call, kills the async advantage and > introduces delays in the overall throughput. Looking forward for your help. > > Thanks and Regards > Pankaj Misra > > ________________________________ > > Neustar VP and Impetus CEO to present on 'Innovative information services > powered by Cloud and Big Data technologies'at Cloud Expo - Santa Clara, Nov > 6th. http://www.impetus.com/events#2. > > Check out Impetus contribution to build Luminar - a new business unit at > Entravision. http://lf1.me/MS/ > > > NOTE: This message may contain information that is confidential, > proprietary, privileged or otherwise protected by law. The message is > intended solely for the named addressee. If received in error, please > destroy and notify the sender. Any use of this email is prohibited when > received in error. Impetus does not represent, warrant and/or guarantee, > that the integrity of this communication has been maintained nor that the > communication is free of errors, virus, interception or interference. > > ________________________________ > > Neustar VP and Impetus CEO to present on ‘Innovative information services > powered by Cloud and Big Data technologies’at Cloud Expo - Santa Clara, Nov > 6th. http://www.impetus.com/events#2. > > Check out Impetus contribution to build Luminar - a new business unit at > Entravision. http://lf1.me/MS/ > > > NOTE: This message may contain information that is confidential, > proprietary, privileged or otherwise protected by law. The message is > intended solely for the named addressee. If received in error, please > destroy and notify the sender. Any use of this email is prohibited when > received in error. Impetus does not represent, warrant and/or guarantee, > that the integrity of this communication has been maintained nor that the > communication is free of errors, virus, interception or interference. > > ________________________________ > > Neustar VP and Impetus CEO to present on ‘Innovative information services > powered by Cloud and Big Data technologies’at Cloud Expo - Santa Clara, Nov > 6th. http://www.impetus.com/events#2. > > Check out Impetus contribution to build Luminar - a new business unit at > Entravision. http://lf1.me/MS/ > > > NOTE: This message may contain information that is confidential, > proprietary, privileged or otherwise protected by law. The message is > intended solely for the named addressee. If received in error, please > destroy and notify the sender. Any use of this email is prohibited when > received in error. Impetus does not represent, warrant and/or guarantee, > that the integrity of this communication has been maintained nor that the > communication is free of errors, virus, interception or interference. >
