You can avoid read & write running parallel from your application level, if I read your mail correctly. You can use ReentrantReadWriteLock if your intention is like that. But it's not recommended. HBase has its own mechanism(MVCC) to manage the read/write consistency. When we start a scanning, the latest data has not committed by MVCC may not be visible(According to our configuration).
Jieshan -----Original Message----- From: Bing Li [mailto:[email protected]] Sent: Thursday, September 20, 2012 10:02 AM To: [email protected]; user Subject: Is it correct and required to keep consistency this way? Dear all, Sorry to send the email multiple times! An error in the previous email is corrected. I am not exactly sure if it is correct and required to keep consistency as follows when saving and reading from HBase? Your help is highly appreciated. Best regards, Bing // Writing public void AddOutgoingNeighbor(String hostNodeKey, String groupKey, int timingScale, String neighborKey) { List<Put> puts = new ArrayList<Put>(); Put hostNodeKeyPut; Put groupKeyPut; Put topGroupKeyPut; Put timingScalePut; Put neighborKeyPut; byte[] outgoingRowKey = Bytes.toBytes(NeighborStructure.NODE_OUTGOING_NEIGHBOR_ROW + Tools.GetAHash(hostNodeKey + groupKey + timingScale + neighborKey)); hostNodeKeyPut = new Put(outgoingRowKey); hostNodeKeyPut.add(NeighborStructure.NODE_OUTGOING_NEIGHBOR_FAMILY, NeighborStructure.NODE_OUTGOING_NEIGHBOR_HOST_NODE_KEY_COLUMN, Bytes.toBytes(hostNodeKey)); puts.add(hostNodeKeyPut); groupKeyPut = new Put(outgoingRowKey); groupKeyPut.add(NeighborStructure.NODE_OUTGOING_NEIGHBOR_FAMILY, NeighborStructure.NODE_OUTGOING_NEIGHBOR_GROUP_KEY_COLUMN, Bytes.toBytes(groupKey)); puts.add(groupKeyPut); topGroupKeyPut = new Put(outgoingRowKey); topGroupKeyPut.add(NeighborStructure.NODE_OUTGOING_NEIGHBOR_FAMILY, NeighborStructure.NODE_OUTGOING_NEIGHBOR_TOP_GROUP_KEY_COLUMN, Bytes.toBytes(GroupRegistry.WWW().GetParentGroupKey(groupKey))); puts.add(topGroupKeyPut); timingScalePut = new Put(outgoingRowKey); timingScalePut.add(NeighborStructure.NODE_OUTGOING_NEIGHBOR_FAMILY, NeighborStructure.NODE_OUTGOING_NEIGHBOR_TIMING_SCALE_COLUMN, Bytes.toBytes(timingScale)); puts.add(timingScalePut); neighborKeyPut = new Put(outgoingRowKey); neighborKeyPut.add(NeighborStructure.NODE_OUTGOING_NEIGHBOR_FAMILY, NeighborStructure.NODE_OUTGOING_NEIGHBOR_NEIGHBOR_KEY_COLUMN, Bytes.toBytes(neighborKey)); puts.add(neighborKeyPut); try { // Locking is here this.lock.writeLock().lock(); this.neighborTable.put(puts); this.lock.writeLock().unlock(); } catch (IOException e) { e.printStackTrace(); } } // Reading public Set<String> GetOutgoingNeighborKeys(String hostNodeKey, int timingScale) { List<Filter> outgoingNeighborsList = new ArrayList<Filter>(); SingleColumnValueFilter hostNodeKeyFilter = new SingleColumnValueFilter(NeighborStructure.NODE_OUTGOING_NEIGHBOR_FAMILY, NeighborStructure.NODE_OUTGOING_NEIGHBOR_HOST_NODE_KEY_COLUMN, CompareFilter.CompareOp.EQUAL, new SubstringComparator(hostNodeKey)); hostNodeKeyFilter.setFilterIfMissing(true); outgoingNeighborsList.add(hostNodeKeyFilter); SingleColumnValueFilter timingScaleFilter = new SingleColumnValueFilter(NeighborStructure.NODE_OUTGOING_NEIGHBOR_FAMILY, NeighborStructure.NODE_OUTGOING_NEIGHBOR_TIMING_SCALE_COLUMN, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(timingScale))); timingScaleFilter.setFilterIfMissing(true); outgoingNeighborsList.add(timingScaleFilter); FilterList outgoingNeighborFilter = new FilterList(outgoingNeighborsList); Scan scan = new Scan(); scan.setFilter(outgoingNeighborFilter); scan.setCaching(Parameters.CACHING_SIZE); scan.setBatch(Parameters.BATCHING_SIZE); String qualifier; Set<String> neighborKeySet = Sets.newHashSet(); try { // Lock is here this.lock.readLock().lock(); ResultScanner scanner = this.neighborTable.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { qualifier = Bytes.toString(kv.getQualifier()); if (qualifier.equals(NeighborStructure.NODE_OUTGOING_NEIGHBOR_NEIGHBOR_KEY_STRING_COLUMN)) { neighborKeySet.add(Bytes.toString(kv.getValue())); } } } scanner.close(); this.lock.readLock().unlock(); } catch (IOException e) { e.printStackTrace(); } return neighborKeySet; }
