[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-18 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r336716904
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +89,120 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
-  public  boolean compressedBucketWrite(String path, 
T value)
+  public  boolean compressedBucketWrite(String 
rootPath, T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return "0".getBytes();
+  }
+  // Last write exists, so increment and write it back
+  // **String conversion is necessary to make it display in ZK 
(zooinspector)**
+  String lastWriteVersionStr = new String(dataInZk);
+  long lastWriteVersion = Long.parseLong(lastWriteVersionStr);
+  lastWriteVersion++;
+  return String.valueOf(lastWriteVersion).getBytes();
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor.doUpdate(
+rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, 
AccessOption.PERSISTENT);
+if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
rootPath));
+}
+
+// Successfully reserved a version number
+byte[] binaryVersion = (byte[]) result._updatedValue;
+String versionStr = new String(binaryVersion);
+final long version = Long.parseLong(versionStr);
+
+// 2. Write to the incremented last write version
+String versionedDataPath = rootPath + "/" + versionStr;
+
+// Take the ZNRecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
-
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", dataPath);
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-18 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r336716790
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -19,56 +19,60 @@
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
-import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.util.GZipCompressionUtil;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable 
{
-  private static Logger LOG = 
LoggerFactory.getLogger(ZkBucketDataAccessor.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkBucketDataAccessor.class);
 
-  private static final int DEFAULT_NUM_VERSIONS = 2;
+  private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB
+  private static final long DEFAULT_VERSION_TTL = 
TimeUnit.MINUTES.toMillis(1L); // 1 min
   private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
   private static final String DATA_SIZE_KEY = "DATA_SIZE";
-  private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
-  private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+  private static final String METADATA_KEY = "METADATA";
+  private static final String LAST_SUCCESSFUL_WRITE_KEY = 
"LAST_SUCCESSFUL_WRITE";
+  private static final String LAST_WRITE_KEY = "LAST_WRITE";
 
 Review comment:
   Added!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-18 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r336716846
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -19,56 +19,60 @@
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
-import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.util.GZipCompressionUtil;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable 
{
-  private static Logger LOG = 
LoggerFactory.getLogger(ZkBucketDataAccessor.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkBucketDataAccessor.class);
 
-  private static final int DEFAULT_NUM_VERSIONS = 2;
+  private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB
+  private static final long DEFAULT_VERSION_TTL = 
TimeUnit.MINUTES.toMillis(1L); // 1 min
   private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
   private static final String DATA_SIZE_KEY = "DATA_SIZE";
-  private static final String WRITE_LOCK_KEY = "WRITE_LOCK";
-  private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS";
+  private static final String METADATA_KEY = "METADATA";
+  private static final String LAST_SUCCESSFUL_WRITE_KEY = 
"LAST_SUCCESSFUL_WRITE";
+  private static final String LAST_WRITE_KEY = "LAST_WRITE";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  // Thread pool for deleting stale versions
+  private static final ScheduledExecutorService GC_THREAD = 
Executors.newScheduledThreadPool(1);
 
-  // 100 KB for default bucket size
-  private static final int DEFAULT_BUCKET_SIZE = 50 * 1024;
   private final int _bucketSize;
-  private final int _numVersions;
+  private final long _versionTTL;
   private ZkSerializer _zkSerializer;
   private HelixZkClient _zkClient;
-  private HelixZkClient _znRecordClient;
-  private BaseDataAccessor _zkBaseDataAccessor;
-  private BaseDataAccessor _znRecordBaseDataAccessor;
+  private ZkBaseDataAccessor _zkBaseDataAccessor;
 
   /**
* Constructor that allows a custom bucket size.
* @param zkAddr
* @param bucketSize
-   * @param numVersions number of versions to store in ZK
+   * @param versionTTL in ms
 
 Review comment:
   Changed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-18 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r336716918
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +223,158 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+List paths = new ArrayList<>();
+for (int i = 0; i < numBuckets; i++) {
+  paths.add(dataPath + "/" + i);
+}
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Async get
+List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
+
+// Combine buckets into one byte array
+int copyPtr = 0;
+for (int i = 0; i < numBuckets; i++) {
+  if (i == numBuckets - 1) {
+// 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-18 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r336713450
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -19,56 +19,60 @@
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
 
 Review comment:
   google ones I think should be at the top with the new style file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-15 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335245305
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +218,156 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: 
%s!", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-15 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335123036
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +222,188 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: 
%s!", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-15 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335120578
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
+  }
+  // Last write exists, so increment and write it back with a timestamp
+  String lastWriteVersionStr = new String(dataInZk);
+  long lastWriteVersion = extractVersion(lastWriteVersionStr);
+  lastWriteVersion++;
+  return (lastWriteVersion + "_" + timestamp).getBytes();
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor
+.doUpdate(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, 
AccessOption.PERSISTENT);
+if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
path));
+}
+
+// Successfully reserved a version number
+byte[] binaryVersion = (byte[]) result._updatedValue;
+final String timestampedVersion = new String(binaryVersion);
+final long version = extractVersion(timestampedVersion);
+
+// 2. Write to the incremented last write version with timestamp for TTL
+String versionedDataPath = path + "/" + timestampedVersion;
+
+// Take the ZNRecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
-
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", dataPath);
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while (counter < 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-15 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335120578
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
+  }
+  // Last write exists, so increment and write it back with a timestamp
+  String lastWriteVersionStr = new String(dataInZk);
+  long lastWriteVersion = extractVersion(lastWriteVersionStr);
+  lastWriteVersion++;
+  return (lastWriteVersion + "_" + timestamp).getBytes();
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor
+.doUpdate(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, 
AccessOption.PERSISTENT);
+if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
path));
+}
+
+// Successfully reserved a version number
+byte[] binaryVersion = (byte[]) result._updatedValue;
+final String timestampedVersion = new String(binaryVersion);
+final long version = extractVersion(timestampedVersion);
+
+// 2. Write to the incremented last write version with timestamp for TTL
+String versionedDataPath = path + "/" + timestampedVersion;
+
+// Take the ZNRecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
-
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", dataPath);
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while (counter < 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-15 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335120578
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
+  }
+  // Last write exists, so increment and write it back with a timestamp
+  String lastWriteVersionStr = new String(dataInZk);
+  long lastWriteVersion = extractVersion(lastWriteVersionStr);
+  lastWriteVersion++;
+  return (lastWriteVersion + "_" + timestamp).getBytes();
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor
+.doUpdate(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, 
AccessOption.PERSISTENT);
+if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
path));
+}
+
+// Successfully reserved a version number
+byte[] binaryVersion = (byte[]) result._updatedValue;
+final String timestampedVersion = new String(binaryVersion);
+final long version = extractVersion(timestampedVersion);
+
+// 2. Write to the incremented last write version with timestamp for TTL
+String versionedDataPath = path + "/" + timestampedVersion;
+
+// Take the ZNRecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
-
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", dataPath);
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while (counter < 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-15 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335120079
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
+  }
+  // Last write exists, so increment and write it back with a timestamp
+  String lastWriteVersionStr = new String(dataInZk);
+  long lastWriteVersion = extractVersion(lastWriteVersionStr);
+  lastWriteVersion++;
+  return (lastWriteVersion + "_" + timestamp).getBytes();
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor
+.doUpdate(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, 
AccessOption.PERSISTENT);
+if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
path));
+}
+
+// Successfully reserved a version number
+byte[] binaryVersion = (byte[]) result._updatedValue;
+final String timestampedVersion = new String(binaryVersion);
+final long version = extractVersion(timestampedVersion);
+
+// 2. Write to the incremented last write version with timestamp for TTL
+String versionedDataPath = path + "/" + timestampedVersion;
+
+// Take the ZNRecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
-
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", dataPath);
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while (counter < 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-15 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r335118924
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +218,156 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: 
%s!", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-14 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334762342
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
 
 Review comment:
   It's possible if we consider cases where there are multiple writers. Of 
course, in the case where there's one writer, using timestamp would work and 
would simplify things. But the purpose of this change is to increase 
parallelism, and that means we want to allow multiple writers (whether they are 
in the same data centers or not) as well.
   
   Even if we had multiple writers in the same data center, NTP usually doesn't 
guarantee strict ordering of timestamps either.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-14 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334762342
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
 
 Review comment:
   It's possible if we consider cases where there could be multiple writers. Of 
course, in the case where there's one writer, using timestamp would work and 
would simplify things. But the purpose of this change is to increase 
parallelism, and that means we want to allow multiple writers (whether they are 
in the same data centers or not) as well.
   
   Even if we had multiple writers in the same data center, NTP usually doesn't 
guarantee strict ordering of timestamps either.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-14 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334720480
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
+  }
+  // Last write exists, so increment and write it back with a timestamp
+  String lastWriteVersionStr = new String(dataInZk);
+  long lastWriteVersion = extractVersion(lastWriteVersionStr);
+  lastWriteVersion++;
+  return (lastWriteVersion + "_" + timestamp).getBytes();
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor
+.doUpdate(path + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, 
AccessOption.PERSISTENT);
+if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
 
 Review comment:
   _retCode is a public field.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-14 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334720739
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +218,156 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: 
%s!", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-14 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334720366
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +87,121 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
+_versionTTL = versionTTL;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
-// Take the ZNrecord and serialize it (get byte[])
+final long timestamp = System.currentTimeMillis();
+
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return ("0_" + timestamp).getBytes();
 
 Review comment:
   Please see 
https://github.com/apache/helix/wiki/Concurrency-and-Parallelism-for-BucketDataAccessor
 for details.
   Timestamp cannot be used for monotonically-increasing sequences because of 
clock drift.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-14 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334613185
 
 

 ##
 File path: 
helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java
 ##
 @@ -69,17 +98,43 @@ public void afterClass() {
*/
   @Test
   public void testCompressedBucketWrite() throws IOException {
-ZNRecord record = new ZNRecord(NAME_KEY);
-record.setSimpleField(NAME_KEY, NAME_KEY);
-record.setListField(NAME_KEY, LIST_FIELD);
-record.setMapField(NAME_KEY, MAP_FIELD);
 Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new 
HelixProperty(record)));
   }
 
+  @Test(dependsOnMethods = "testCompressedBucketWrite")
+  public void testMultipleWrites() throws Exception {
+int count = 50;
+
+// Write 10 times
 
 Review comment:
   TODO: Fix comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-11 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334212431
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +218,156 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: 
%s!", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-11 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334212180
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +218,156 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: 
%s!", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-11 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r334213668
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +218,156 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+String versionToRead = new String(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: 
%s!", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-10 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333777205
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
+AtomicLong versionRef = new AtomicLong();
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return Longs.toByteArray(0);
+  }
+  // Last write exists, so increment and write it back
+  long lastWriteVersion = Longs.fromByteArray(dataInZk);
+  lastWriteVersion++;
+  // Set the AtomicReference
+  versionRef.set(lastWriteVersion);
+  return Longs.toByteArray(lastWriteVersion);
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+if (!_zkBaseDataAccessor.update(path + "/" + LAST_WRITE_KEY, 
lastWriteVersionUpdater,
+AccessOption.PERSISTENT)) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
path));
+}
+// Successfully reserved a version number
+final long version = versionRef.get();
+
+// 2. Write to the incremented last write version
+String versionedDataPath = path + "/" + version;
+
 // Take the ZNrecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while (counter < numBuckets) {
+  paths.add(versionedDataPath + "/" + counter);
+  if (counter == numBuckets - 1) {
+// Special treatment for the last bucket
+buckets.add(
+Arrays.copyOfRange(compressedRecord, ptr, ptr + 
compressedRecord.length % _bucketSize));
+  } else {
+buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
+  }
+  ptr += _bucketSize;
+  counter++;
+}
 
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-10 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333771838
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
+AtomicLong versionRef = new AtomicLong();
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return Longs.toByteArray(0);
+  }
+  // Last write exists, so increment and write it back
+  long lastWriteVersion = Longs.fromByteArray(dataInZk);
+  lastWriteVersion++;
+  // Set the AtomicReference
+  versionRef.set(lastWriteVersion);
+  return Longs.toByteArray(lastWriteVersion);
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+if (!_zkBaseDataAccessor.update(path + "/" + LAST_WRITE_KEY, 
lastWriteVersionUpdater,
+AccessOption.PERSISTENT)) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
path));
+}
+// Successfully reserved a version number
+final long version = versionRef.get();
+
+// 2. Write to the incremented last write version
+String versionedDataPath = path + "/" + version;
+
 // Take the ZNrecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while (counter < numBuckets) {
+  paths.add(versionedDataPath + "/" + counter);
+  if (counter == numBuckets - 1) {
+// Special treatment for the last bucket
+buckets.add(
+Arrays.copyOfRange(compressedRecord, ptr, ptr + 
compressedRecord.length % _bucketSize));
+  } else {
+buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
+  }
+  ptr += _bucketSize;
+  counter++;
+}
 
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-10 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333773875
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
+AtomicLong versionRef = new AtomicLong();
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
 
 Review comment:
   _baseDataAccessor only takes a byte array. I think that having 1 data 
accessor is more beneficial.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-10 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333775129
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
+AtomicLong versionRef = new AtomicLong();
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return Longs.toByteArray(0);
+  }
+  // Last write exists, so increment and write it back
+  long lastWriteVersion = Longs.fromByteArray(dataInZk);
+  lastWriteVersion++;
+  // Set the AtomicReference
+  versionRef.set(lastWriteVersion);
+  return Longs.toByteArray(lastWriteVersion);
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+if (!_zkBaseDataAccessor.update(path + "/" + LAST_WRITE_KEY, 
lastWriteVersionUpdater,
 
 Review comment:
   Sure. That also works.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org



[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-10 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333772078
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -85,103 +83,117 @@ public Object deserialize(byte[] data) throws 
ZkMarshallingError {
 return data;
   }
 });
-_zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient);
-
-// TODO: Optimize serialization with Jackson
-// TODO: Or use a better binary serialization protocol
-// TODO: Consider making this also binary
-// TODO: Consider an async write for the metadata as well
-_znRecordClient = SharedZkClientFactory.getInstance()
-.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-_znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient);
-_znRecordClient.setZkSerializer(new ZNRecordSerializer());
-
+_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
 _zkSerializer = new ZNRecordJacksonSerializer();
 _bucketSize = bucketSize;
-_numVersions = numVersions;
   }
 
   /**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
   public ZkBucketDataAccessor(String zkAddr) {
-this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS);
+this(zkAddr, DEFAULT_BUCKET_SIZE);
   }
 
   @Override
   public  boolean compressedBucketWrite(String path, 
T value)
   throws IOException {
+AtomicLong versionRef = new AtomicLong();
+DataUpdater lastWriteVersionUpdater = dataInZk -> {
+  if (dataInZk == null || dataInZk.length == 0) {
+// No last write version exists, so start with 0
+return Longs.toByteArray(0);
+  }
+  // Last write exists, so increment and write it back
+  long lastWriteVersion = Longs.fromByteArray(dataInZk);
+  lastWriteVersion++;
+  // Set the AtomicReference
+  versionRef.set(lastWriteVersion);
+  return Longs.toByteArray(lastWriteVersion);
+};
+
+// 1. Increment lastWriteVersion using DataUpdater
+if (!_zkBaseDataAccessor.update(path + "/" + LAST_WRITE_KEY, 
lastWriteVersionUpdater,
+AccessOption.PERSISTENT)) {
+  throw new HelixException(
+  String.format("Failed to write the write version at path: %s!", 
path));
+}
+// Successfully reserved a version number
+final long version = versionRef.get();
+
+// 2. Write to the incremented last write version
+String versionedDataPath = path + "/" + version;
+
 // Take the ZNrecord and serialize it (get byte[])
 byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
 // Compress the byte[]
 byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
 // Compute N - number of buckets
 int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
 
-if (tryLock(path)) {
-  try {
-// Read or initialize metadata and compute the last success version 
index
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  metadataRecord = new ZNRecord(extractIdFromPath(path));
-}
-int lastSuccessIndex =
-(metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % 
_numVersions;
-String dataPath = path + "/" + lastSuccessIndex;
-
-List paths = new ArrayList<>();
-List buckets = new ArrayList<>();
-
-int ptr = 0;
-int counter = 0;
-while (counter < numBuckets) {
-  paths.add(dataPath + "/" + counter);
-  if (counter == numBuckets - 1) {
-// Special treatment for the last bucket
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr,
-ptr + compressedRecord.length % _bucketSize));
-  } else {
-buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
-  }
-  ptr += _bucketSize;
-  counter++;
-}
+List paths = new ArrayList<>();
+List buckets = new ArrayList<>();
+
+int ptr = 0;
+int counter = 0;
+while (counter < numBuckets) {
+  paths.add(versionedDataPath + "/" + counter);
+  if (counter == numBuckets - 1) {
+// Special treatment for the last bucket
+buckets.add(
+Arrays.copyOfRange(compressedRecord, ptr, ptr + 
compressedRecord.length % _bucketSize));
+  } else {
+buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + 
_bucketSize));
+  }
+  ptr += _bucketSize;
+  counter++;
+}
 
-// Do a cleanup of previous data
-if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) {
-  // Clean-up is not critical so upon failure, we log instead of 
throwing an exception
-  LOG.warn("Failed to clean up previous bucketed data in data path: 
{}", 

[GitHub] [helix] narendly commented on a change in pull request #506: Increase parallelism for ZkBucketDataAccessor

2019-10-10 Thread GitBox
narendly commented on a change in pull request #506: Increase parallelism for 
ZkBucketDataAccessor
URL: https://github.com/apache/helix/pull/506#discussion_r333772807
 
 

 ##
 File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 ##
 @@ -202,126 +214,130 @@ public void disconnect() {
 if (!_zkClient.isClosed()) {
   _zkClient.close();
 }
-if (!_znRecordClient.isClosed()) {
-  _znRecordClient.close();
-}
   }
 
   private HelixProperty compressedBucketRead(String path) {
-// TODO: Incorporate parallelism into reads instead of locking the whole 
thing against other
-// reads and writes
-if (tryLock(path)) {
-  try {
-// Retrieve the metadata
-ZNRecord metadataRecord =
-_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
-if (metadataRecord == null) {
-  throw new ZkNoNodeException(
-  String.format("Metadata ZNRecord does not exist for path: %s", 
path));
-}
-
-int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1);
-int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1);
-int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, 
-1);
-if (lastSuccessIndex == -1) {
-  throw new HelixException(String.format("Metadata ZNRecord does not 
have %s! Path: %s",
-  LAST_SUCCESS_KEY, path));
-}
-if (bucketSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
-}
-if (dataSize == -1) {
-  throw new HelixException(
-  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
-}
-
-// Compute N - number of buckets
-int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
-byte[] compressedRecord = new byte[dataSize];
-String dataPath = path + "/" + lastSuccessIndex;
+// 1. Get the version to read
+byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + 
LAST_SUCCESSFUL_WRITE_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryVersionToRead == null) {
+  throw new ZkNoNodeException(
+  String.format("Last successful write ZNode does not exist for path: 
%s", path));
+}
+long versionToRead = Longs.fromByteArray(binaryVersionToRead);
+
+// 2. Get the metadata map
+byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead 
+ "/" + METADATA_KEY,
+null, AccessOption.PERSISTENT);
+if (binaryMetadata == null) {
+  throw new ZkNoNodeException(
+  String.format("Metadata ZNode does not exist for path: %s", path));
+}
+Map metadata;
+try {
+  metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
+} catch (IOException e) {
+  throw new HelixException(String.format("Failed to deserialize path 
metadata: %s!", path), e);
+}
 
-List paths = new ArrayList<>();
-for (int i = 0; i < numBuckets; i++) {
-  paths.add(dataPath + "/" + i);
-}
+// 3. Read the data
+Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
+Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
+if (bucketSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
BUCKET_SIZE_KEY, path));
+}
+if (dataSizeObj == null) {
+  throw new HelixException(
+  String.format("Metadata ZNRecord does not have %s! Path: %s", 
DATA_SIZE_KEY, path));
+}
+int bucketSize = Integer.parseInt((String) bucketSizeObj);
+int dataSize = Integer.parseInt((String) dataSizeObj);
 
-// Async get
-List buckets = _zkBaseDataAccessor.get(paths, null, 
AccessOption.PERSISTENT, true);
-
-// Combine buckets into one byte array
-int copyPtr = 0;
-for (int i = 0; i < numBuckets; i++) {
-  if (i == numBuckets - 1) {
-// Special treatment for the last bucket
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
dataSize % bucketSize);
-  } else {
-System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, 
bucketSize);
-copyPtr += bucketSize;
-  }
-}
+// Compute N - number of buckets
+int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
+byte[] compressedRecord = new byte[dataSize];
+String dataPath = path + "/" + versionToRead;
 
-// Decompress the byte array
-ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(compressedRecord);
-byte[] serializedRecord;
-try {
-  serializedRecord = 
GZipCompressionUtil.uncompress(byteArrayInputStream);
-} catch (IOException e) {
-  throw new HelixException(String.format("Failed to decompress path: