Are you sharing this.rankTable between threads? HTable is not thread safe. -- Lars
________________________________ From: Bing Li <[email protected]> To: "[email protected]" <[email protected]>; user <[email protected]> Sent: Tuesday, February 5, 2013 8:54 AM Subject: Re: Is "synchronized" required? Dear all, After "synchronized" is removed from the method of writing, I get the following exceptions when reading. Before the removal, no such exceptions. Could you help me how to solve it? Thanks so much! Best wishes, Bing [java] Feb 6, 2013 12:21:31 AM org.apache.hadoop.hbase.ipc.HBaseClient$Connection run [java] WARNING: Unexpected exception receiving call responses [java] java.lang.NullPointerException [java] at org.apache.hadoop.hbase.io.HbaseObjectWritable.readObject(HbaseObjectWritable.java:521) [java] at org.apache.hadoop.hbase.io.HbaseObjectWritable.readFields(HbaseObjectWritable.java:297) [java] at org.apache.hadoop.hbase.ipc.HBaseClient$Connection.receiveResponse(HBaseClient.java:593) [java] at org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:505) [java] Feb 6, 2013 12:21:31 AM org.apache.hadoop.hbase.client.ScannerCallable close [java] WARNING: Ignore, probably already closed [java] java.io.IOException: Call to greatfreeweb/127.0.1.1:60020 failed on local exception: java.io.IOException: Unexpected exception receiving call responses [java] at org.apache.hadoop.hbase.ipc.HBaseClient.wrapException(HBaseClient.java:934) [java] at org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:903) [java] at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:150) [java] at $Proxy6.close(Unknown Source) [java] at org.apache.hadoop.hbase.client.ScannerCallable.close(ScannerCallable.java:112) [java] at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:74) [java] at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:39) [java] at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.getRegionServerWithRetries(HConnectionManager.java:1325) [java] at org.apache.hadoop.hbase.client.HTable$ClientScanner.nextScanner(HTable.java:1167) [java] at org.apache.hadoop.hbase.client.HTable$ClientScanner.next(HTable.java:1296) [java] at org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext(HTable.java:1356) [java] at com.greatfree.hbase.rank.NodeRankRetriever.LoadNodeGroupNodeRankRowKeys(NodeRankRetriever.java:348) [java] at com.greatfree.ranking.PersistNodeGroupNodeRanksThread.run(PersistNodeGroupNodeRanksThread.java:29) [java] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [java] at java.lang.Thread.run(Thread.java:662) [java] Caused by: java.io.IOException: Unexpected exception receiving call responses [java] at org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:509) [java] Caused by: java.lang.NullPointerException [java] at org.apache.hadoop.hbase.io.HbaseObjectWritable.readObject(HbaseObjectWritable.java:521) [java] at org.apache.hadoop.hbase.io.HbaseObjectWritable.readFields(HbaseObjectWritable.java:297) [java] at org.apache.hadoop.hbase.ipc.HBaseClient$Connection.receiveResponse(HBaseClient.java:593) [java] at org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:505) The code that causes the exceptions is as follows. public Set<String> LoadNodeGroupNodeRankRowKeys(String hostNodeKey, String groupKey, int timingScale) { List<Filter> nodeGroupFilterList = new ArrayList<Filter>(); SingleColumnValueFilter hostNodeKeyFilter = new SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY, RankStructure.NODE_GROUP_NODE_RANK_HOST_NODE_KEY_COLUMN, CompareFilter.CompareOp.EQUAL, new SubstringComparator(hostNodeKey)); hostNodeKeyFilter.setFilterIfMissing(true); nodeGroupFilterList.add(hostNodeKeyFilter); SingleColumnValueFilter groupKeyFilter = new SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY, RankStructure.NODE_GROUP_NODE_RANK_GROUP_KEY_COLUMN, CompareFilter.CompareOp.EQUAL, new SubstringComparator(groupKey)); groupKeyFilter.setFilterIfMissing(true); nodeGroupFilterList.add(groupKeyFilter); SingleColumnValueFilter timingScaleFilter = new SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY, RankStructure.NODE_GROUP_NODE_RANK_TIMING_SCALE_COLUMN, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(timingScale))); timingScaleFilter.setFilterIfMissing(true); nodeGroupFilterList.add(timingScaleFilter); FilterList nodeGroupFilter = new FilterList(nodeGroupFilterList); Scan scan = new Scan(); scan.setFilter(nodeGroupFilter); scan.setCaching(Parameters.CACHING_SIZE); scan.setBatch(Parameters.BATCHING_SIZE); Set<String> rowKeySet = Sets.newHashSet(); try { ResultScanner scanner = this.rankTable.getScanner(scan); for (Result result : scanner) // <---- EXCEPTIONS are raised at this line. { for (KeyValue kv : result.raw()) { rowKeySet.add(Bytes.toString(kv.getRow())); break; } } scanner.close(); } catch (IOException e) { e.printStackTrace(); } return rowKeySet; } On Tue, Feb 5, 2013 at 4:20 AM, Bing Li <[email protected]> wrote: > Dear all, > > When writing data into HBase, sometimes I got exceptions. I guess they > might be caused by concurrent writings. But I am not sure. > > My question is whether it is necessary to put "synchronized" before > the writing methods? The following lines are the sample code. > > I think the directive, synchronized, must lower the performance of > writing. Sometimes concurrent writing is needed in my system. > > Thanks so much! > > Best wishes, > Bing > > public synchronized void AddDomainNodeRanks(String domainKey, int > timingScale, Map<String, Double> nodeRankMap) > { > List<Put> puts = new ArrayList<Put>(); > Put domainKeyPut; > Put timingScalePut; > Put nodeKeyPut; > Put rankPut; > > byte[] domainNodeRankRowKey; > > for (Map.Entry<String, Double> nodeRankEntry : nodeRankMap.entrySet()) > { > domainNodeRankRowKey = > Bytes.toBytes(RankStructure.DOMAIN_NODE_RANK_ROW + > Tools.GetAHash(domainKey + timingScale + nodeRankEntry.getKey())); > > domainKeyPut = new Put(domainNodeRankRowKey); > domainKeyPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY, > RankStructure.DOMAIN_NODE_RANK_DOMAIN_KEY_COLUMN, > Bytes.toBytes(domainKey)); > puts.add(domainKeyPut); > > timingScalePut = new Put(domainNodeRankRowKey); > timingScalePut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY, > RankStructure.DOMAIN_NODE_RANK_TIMING_SCALE_COLUMN, > Bytes.toBytes(timingScale)); > puts.add(timingScalePut); > > nodeKeyPut = new Put(domainNodeRankRowKey); > nodeKeyPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY, > RankStructure.DOMAIN_NODE_RANK_NODE_KEY_COLUMN, > Bytes.toBytes(nodeRankEntry.getKey())); > puts.add(nodeKeyPut); > > rankPut = new Put(domainNodeRankRowKey); > rankPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY, > RankStructure.DOMAIN_NODE_RANK_RANKS_COLUMN, > Bytes.toBytes(nodeRankEntry.getValue())); > puts.add(rankPut); > } > > try > { > this.rankTable.put(puts); > } > catch (IOException e) > { > e.printStackTrace(); > } > }
